Small MatrixTable hangs on write into Google bucket

We’ve been testing a script that reads in VCFs, applies filters and writes out to a MatrixTable in GCP, using hailctl. The data we’re testing with has approximately 1.5M variants and 20,000 samples. Our script is outlined below.

The script hangs at L95 (mt_sqc2.write('gs://interval-wgs/matrixtable-test/chr20-full-vqslod-filtered.mt', overwrite=True)). It takes about an hour to get to this point and then just hangs. The Spark progress gets to the last-but-one partition (249 + 1 / 250) and stays there; currently 2.5hrs and counting.

Given this is GCP (64x non-preemptive default flavour nodes, which I think have 8 cores a piece = 512 cores), can you suggest what we’re doing wrong?


import os
from datetime import datetime
import yaml
import hail as hl
import re


snp_vqsr_threshold = -0.7418
indel_vqsr_threshold = -1.2499

class Timer(object):
    def __init__(self, message, now = datetime.utcnow):
        self.message = message
        self.now = now

    def __enter__(self):
        self.start = self.now()

    def __exit__(self, *exception):
        duration = self.now() - self.start

        status = "failed"
        if exception == (None, None, None):
            status = "completed"

        print(f"** {self.message} {status} after {duration}")

if __name__ == "__main__":
    #need to create spark cluster first before intiialising hail
    #Define the hail persistent storage directory
    hl.init(default_reference="GRCh38")

    VQSLOD_snps = hl.import_table("gs://interval-wgs/qc-files/VQSLOD_snps.bgz",
                                  types={"Locus": "locus<GRCh38>", "VQSLOD": hl.tfloat64})
    VQSLOD_indels = hl.import_table("gs://interval-wgs/qc-files/VQSLOD_indels.bgz",
                                    types={"Locus": "locus<GRCh38>", "VQSLOD": hl.tfloat64})
    sample_QC_nonHail = hl.import_table("gs://interval-wgs/qc-files/INTERVAL_WGS_Sample_QC_28-05-2019_fixed_header.txt",
                                        impute=True)

    #  s3folder = 's3a://interval-wgs-pa10/vcfs/'
  #  mt = hail_methods.hail_functions.multi_import_vcf(s3folder)
    with Timer('Read mt and split multialleles'):
        #CHANGE HERE
        #################################################################
        mt = hl.read_matrix_table('gs://interval-wgs/chr20.mt')
        #####################################################################
        mt_split = hl.split_multi_hts(mt, keep_star=False, left_aligned=True)

    #Unfiltered data summary stats:
    with Timer('Unfiltered sample qc and write to google bucket'):
        mt_sqc1_unfiltered = mt_split.annotate_cols(sample_QC_nonHail=sample_QC_nonHail.key_by("ID")[mt_split.s])
        mt_sqc2_unfiltered = hl.sample_qc(mt_sqc1_unfiltered, name='sample_QC_Hail')
        panda_df_unfiltered_table = mt_sqc2_unfiltered.cols().flatten()
        panda_df_unfiltered_table.export('gs://interval-wgs/testrun/sampleQC_unfiltered_pandaDF_mercury.tsv.bgz', header=True)
        #with hl.utils.hadoop_open('gs://interval-wgs/testrun/sampleQC_unfiltered_pandaDF_mercury.tsv', 'w') as f:
        #    panda_df_unfiltered.to_csv(f, index=False, sep="\t", header=True)

    with Timer('Split to snps and indels'):
        mt_snps = hl.filter_alleles(mt_split, lambda allele, _: hl.is_snp(mt_split.alleles[0], allele))
        mt_indels = hl.filter_alleles(mt_split, lambda allele, _: hl.is_indel(mt_split.alleles[0], allele))
    with Timer('Annotate snps and indels with VQSR scores:'):
        mt_snps = mt_snps.annotate_rows(VQSLOD=VQSLOD_snps.key_by("Locus")[mt_snps.locus])
        mt_indels = mt_indels.annotate_rows(VQSLOD=VQSLOD_indels.key_by("Locus")[mt_indels.locus])
    with Timer('Filtering VQSR'):
        mt_snps_vqslod_filtered = mt_snps.filter_rows(mt_snps.VQSLOD.VQSLOD >= snp_vqsr_threshold)
        mt_indels_vqslod_filtered = mt_indels.filter_rows(mt_indels.VQSLOD.VQSLOD >= indel_vqsr_threshold)
    with Timer('Join snps and indels'):
        mt_vqslod_filtered = mt_snps_vqslod_filtered.union_rows(mt_indels_vqslod_filtered)


    with Timer('Sample QC filtering and write to to google bucket'):
        mt_sqc1 = mt_vqslod_filtered.annotate_cols(
            sample_QC_nonHail=sample_QC_nonHail.key_by("ID")[mt_vqslod_filtered.s])

        mt_sqc1_filtered = mt_sqc1.filter_cols(
        (mt_sqc1.sample_QC_nonHail.PASS_CHIM == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_ID == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_Median_FreeMix == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_NRD == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_PIHAT == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_SampleSwap == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_Sex == 1) &
        (mt_sqc1.sample_QC_nonHail.PASS_DUP == 1)
        )

        mt_sqc2 = hl.sample_qc(mt_sqc1_filtered, name='sample_QC_Hail')
        with Timer('filtered sample QC to pandas and write to to google bucket'):
            panda_df = mt_sqc2.cols().flatten()
            panda_df.export('gs://interval-wgs/testrun/sampleQC_after_step1_pandaDF_mercury2.tsv.bgz', header= True)
            #with hl.utils.hadoop_open('gs://interval-wgs/testrun/sampleQC_after_step1_pandaDF_mercury2.tsv', 'w') as f:
            #    panda_df.to_csv(f, index=False, sep="\t", header=True)


    with Timer('Write sampleqc mt to to google bucket'):
        mt_sqc2.write('gs://interval-wgs/matrixtable-test/chr20-full-vqslod-filtered.mt', overwrite=True)

    ##########################################
        #Variant QC
        #mt_sqc2 = hl.read_matrix_table('gs://interval-wgs/matrixtable-test/chr20-full-vqslod-filtered.mt')

    with Timer('Variant QC and write table to to google bucket:'):
        mt_sqc_vqc = hl.variant_qc(mt_sqc2, name='variant_QC_Hail')
        mt_sqc_vqc_filtered = mt_sqc_vqc.filter_rows(
            (mt_sqc_vqc.variant_QC_Hail.call_rate >= 0.98) &
            (mt_sqc_vqc.variant_QC_Hail.p_value_hwe >= 10 ** -6))

        mt_sqc_vqc_filtered_rows = mt_sqc_vqc_filtered.rows()
        panda_df_vq = mt_sqc_vqc_filtered_rows.select(mt_sqc_vqc_filtered_rows.variant_QC_Hail).flatten()
        panda_df.export('gs://interval-wgs/testrun/variantQC_sampleQC_pandaDF_mercury2.tsv.bgz', header=True)
        #with hl.utils.hadoop_open('gs://interval-wgs/testrun/variantQC_sampleQC_pandaDF_mercury2.tsv', 'w') as f:
        #   panda_df_vq.to_csv(f, index=False, sep="\t", header=True)

    with Timer('Write variantqc matrixtable to to google bucket'):
        mt_sqc_vqc_filtered.write('gs://interval-wgs/matrixtable-test/chr20-full-sampleqc-variantqc-filtered-FINAL.mt',overwrite = True)

A few things:

Hail builds a computational graph. Put another way, it does not eagerly evaluate expressions, it lazily evaluates them. The only expressions that perform any computation are “actions” such as write, export, collect, show, and take.

Moreover, when hail executes an action, it runs all the code necessary to produce the output, even if some of that code has been run before. Why not cache the results from a previous run? In general, Hail is used with dataset much to large to store in memory, even on a cluster of many machines.

Some general comments:

  • never x.export then x.write. Always write first, it’s faster and preserves all the information in the matrix table. After writing, you can hl.read_matrix_table(...).export(...) this will be significantly faster than x.export(...); x.write(...) because it only runs x once and writes it to disk.
  • split_multi is a potentially expensive operation due to the ordering changes that might be introduced; I’d recommend writing and reading after split_multi.
  • don’t use filter_alleles if you have a biallelic (post-split-multi) dataset, filter_alleles does bookkeeping to handle multi-allelic sites that is unnecessary in the biallelic setting. For example, this:
mt_snps = hl.filter_alleles(mt_split, lambda allele, _: hl.is_snp(mt_split.alleles[0], allele))

can be:

mt_snps = mt_split.filter_rows(hl.is_snp(mt_split.alleles[0], mt_split.alleles[1]))
  • when a computational graph forks, as it does with mt_snps and mt_indels, you should consider writing and then reading the shared portion of the graph. mt.checkpoint(...) is short hand for write-then-read.

  • splitting into two datasets, performing separate filtering, and then unioning introduces a lot of bookkeeping because hail relies on data ordering to ensure performance. Instead of splitting, I recommend annotating with a “type” (“snp”, “indel”, “other”) and then using Boolean logic to apply different filtering conditions to different types

These changes should significantly improve the performance of this pipeline. Let us know how it goes!

Thank you very much for this very informative reply @danking. We have updated our code so that we no longer split our mtables to snps and indels but filter based on annotation columns. We are still however seeing the same peculiar problem in both s3 and google when trying to write the matrixtable out. I think I must be doing something very basic wrong and naively and i am running into this issue where the script gets stuck at the stage of outputing the matrixtable right so: [Stage 4:=====================================================>(998 + 2) / 1000]
These are my commands:
mt = hl.import_vcf(“s3a://intervalwgs/chr20_10495710-11001242_all.g.vcf.bgz”,
array_elements_required=False, min_partitions=1000, force_bgz=True)
mt.write(‘s3a://interval-wgs-pa10/checkpoints/chr20-mini-fragment.mt’, overwrite=True)
#mt = hl.read_matrix_table(‘s3a://intervalwgs/chr20.mt’)
mt_split = hl.split_multi_hts(mt, keep_star=False, left_aligned=True)

    mt_split.write('s3a://interval-wgs-pa10/checkpoints/chr20-mini-split-multi.mt', stage_locally=True, overwrite=True)

    mt_split = hl.read_matrix_table('s3a://interval-wgs-pa10/checkpoints/chr20-mini-split-multi.mt')

The first matrixtable is written in seconds, the second after the split never completes after an hour waiting with a matrix table with 26817 rows and 12354 columns.

Is there something obvious im doing wrong in my script?

Can you share the hail log file? Hopefully it has some information about what went wrong.

Hi Dan, this is the log:

Hi Dan,
yes here it is:

Blockquote

Requirement already satisfied, skipping upgrade: ptyprocess>=0.5 in ./.venv/lib/python3.6/site-packages (from pexpect; sys_platform != “win32”->ipython>=4.0.0->ipykernel<5->hail) (0.6.0)
Requirement already satisfied, skipping upgrade: wcwidth in ./.venv/lib/python3.6/site-packages (from prompt-toolkit<2.1.0,>=2.0.0->ipython>=4.0.0->ipykernel<5->hail) (0.1.7)
Requirement already satisfied, skipping upgrade: parso>=0.5.0 in ./.venv/lib/python3.6/site-packages (from jedi>=0.10->ipython>=4.0.0->ipykernel<5->hail) (0.5.1)
WARNING: Dataproc --region flag will become required in January 2020. Please either specify this flag, or set default by running 'gcloud config set dataproc/region ’
ERROR: (gcloud.dataproc.clusters.create) ALREADY_EXISTS: Already exists: Failed to create cluster: Cluster projects/ibd-interval/regions/global/clusters/hail
gcloud dataproc clusters create hail 
–image-version=1.4-debian9 
–properties=spark:spark.driver.maxResultSize=0,spark:spark.task.maxFailures=20,spark:spark.kryoserializer.buffer.max=1g,spark:spark.driver.extraJavaOptions=-Xss4M,spark:spark.executor.extraJavaOptions=-Xss4M,hdfs:dfs.replication=1,dataproc:dataproc.logging.stackdriver.enable=false,dataproc:dataproc.monitoring.stackdriver.enable=false,spark:spark.driver.memory=41g 
–initialization-actions=gs://hail-common/hailctl/dataproc/0.2.20/init_notebook.py 
–metadata=^|||^WHEEL=gs://hail-common/hailctl/dataproc/0.2.20/hail-0.2.20-py3-none-any.whl|||PKGS=aiohttp|bokeh>1.1,<1.3|decorator<5|gcsfs==0.2.1|hurry.filesize==0.9|ipykernel<5|nest_asyncio|numpy<2|pandas>0.22,<0.24|parsimonious<0.9|PyJWT|python-json-logger==0.1.11|requests>=2.21.0,<2.21.1|scipy>1.2,<1.4|tabulate==0.8.3|PyYAML 
–master-machine-type=n1-highmem-8 
–master-boot-disk-size=100GB 
–num-master-local-ssds=0 
–num-preemptible-workers=0 
–num-worker-local-ssds=0 
–num-workers=64 
–preemptible-worker-boot-disk-size=40GB 
–worker-boot-disk-size=40 
–worker-machine-type=n1-standard-8 
–zone=europe-west2-a 
–initialization-action-timeout=20m 
–labels=creator=pa10_sanger_ac_uk
Starting cluster ‘hail’…
Traceback (most recent call last):
File “/lustre/scratch119/realdata/mdt3/teams/hgi/hail/interval_wgs_upload/google/.venv/bin/hailctl”, line 11, in 
sys.exit(main())
File “/lustre/scratch119/realdata/mdt3/teams/hgi/hail/interval_wgs_upload/google/.venv/lib/python3.6/site-packages/hailtop/hailctl/main.py”, line 91, in main
cli.main(args)
File “/lustre/scratch119/realdata/mdt3/teams/hgi/hail/interval_wgs_upload/google/.venv/lib/python3.6/site-packages/hailtop/hailctl/dataproc/cli.py”, line 106, in main
jmp[args.module].main(args, pass_through_args)
File “/lustre/scratch119/realdata/mdt3/teams/hgi/hail/interval_wgs_upload/google/.venv/lib/python3.6/site-packages/hailtop/hailctl/dataproc/start.py”, line 195, in main
sp.check_call(cmd)
File “/software/python-3.6.1/lib/python3.6/subprocess.py”, line 291, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command ‘[‘gcloud’, ‘dataproc’, ‘clusters’, ‘create’, ‘hail’, ‘–image-version=1.4-debian9’, ‘–properties=spark:spark.driver.maxResultSize=0,spark:spark.task.maxFailures=20,spark:spark.kryoserializer.buffer.max=1g,spark:spark.driver.extraJavaOptions=-Xss4M,spark:spark.executor.extraJavaOptions=-Xss4M,hdfs:dfs.replication=1,dataproc:dataproc.logging.stackdriver.enable=false,dataproc:dataproc.monitoring.stackdriver.enable=false,spark:spark.driver.memory=41g’, ‘–initialization-actions=gs://hail-common/hailctl/dataproc/0.2.20/init_notebook.py’, ‘–metadata=^|||^WHEEL=gs://hail-common/hailctl/dataproc/0.2.20/hail-0.2.20-py3-none-any.whl|||PKGS=aiohttp|bokeh>1.1,<1.3|decorator<5|gcsfs==0.2.1|hurry.filesize==0.9|ipykernel<5|nest_asyncio|numpy<2|pandas>0.22,<0.24|parsimonious<0.9|PyJWT|python-json-logger==0.1.11|requests>=2.21.0,<2.21.1|scipy>1.2,<1.4|tabulate==0.8.3|PyYAML’, ‘–master-machine-type=n1-highmem-8’, ‘–master-boot-disk-size=100GB’, ‘–num-master-local-ssds=0’, ‘–num-preemptible-workers=0’, ‘–num-worker-local-ssds=0’, ‘–num-workers=64’, ‘–preemptible-worker-boot-disk-size=40GB’, ‘–worker-boot-disk-size=40’, ‘–worker-machine-type=n1-standard-8’, ‘–zone=europe-west2-a’, ‘–initialization-action-timeout=20m’, ‘–labels=creator=pa10_sanger_ac_uk’]’ returned non-zero exit status 1.
WARNING: Dataproc --region flag will become required in January 2020. Please either specify this flag, or set default by running 'gcloud config set dataproc/region ’
Job [df460c2480934eb89acb237d75ac477d] submitted.
Waiting for job output…
Running on Apache Spark version 2.4.3
SparkUI available at http://hail-m.c.ibd-interval.internal:4042
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
// //_,/// version 0.2.20-dd6c996e7db5
LOGGING: writing to /tmp/df460c2480934eb89acb237d75ac477d/hail-20190902-1603-0.2.20-dd6c996e7db5.log
2019-09-02 16:03:44 Hail: INFO: Reading table with no type imputation
Loading column ‘Locus’ as type ‘locus’ (user-specified)
Loading column ‘VQSLOD’ as type ‘float64’ (user-specified)

2019-09-02 16:03:45 Hail: INFO: Reading table with no type imputation
Loading column ‘Locus’ as type ‘locus’ (user-specified)
Loading column ‘VQSLOD’ as type ‘float64’ (user-specified)

2019-09-02 16:03:45 Hail: INFO: Reading table to impute column types

[Stage 0:> (0 + 8) / 8]
[Stage 0:=============================> (4 + 4) / 8]2019-09-02 16:03:47 Hail: INFO: Finished type imputation
Loading column ‘ID’ as type ‘str’ (imputed)
Loading column ‘SupplierName’ as type ‘str’ (imputed)
Loading column ‘SangerName’ as type ‘str’ (imputed)
Loading column ‘ALIQUOT_LAB1’ as type ‘str’ (imputed)
Loading column ‘GenotypeID’ as type ‘int64’ (imputed)
Loading column ‘GenotypeData’ as type ‘int64’ (imputed)
Loading column ‘Identifier’ as type ‘int32’ (imputed)
Loading column ‘Wgs_RAW_bl’ as type ‘str’ (imputed)
Loading column ‘Wgs_RAW_24m’ as type ‘str’ (imputed)
Loading column ‘Study’ as type ‘str’ (imputed)
Loading column ‘DUP’ as type ‘str’ (imputed)
Loading column ‘Inferred_GWA_ID’ as type ‘int64’ (imputed)
Loading column ‘MatchedIDs’ as type ‘int32’ (imputed)
Loading column ‘PIHAT_01875’ as type ‘str’ (imputed)
Loading column ‘SEX’ as type ‘int32’ (imputed)
Loading column ‘KARYOTYPE’ as type ‘str’ (imputed)
Loading column ‘Depth’ as type ‘float64’ (imputed)
Loading column ‘Median_FreeMix_NPG’ as type ‘float64’ (imputed)
Loading column ‘Median_FreeMix_HGI’ as type ‘float64’ (imputed)
Loading column ‘NRD’ as type ‘float64’ (imputed)
Loading column ‘Missing’ as type ‘int32’ (imputed)
Loading column ‘Het’ as type ‘int32’ (imputed)
Loading column ‘HomAlt’ as type ‘int32’ (imputed)
Loading column ‘HetHomAlt’ as type ‘float64’ (imputed)
Loading column ‘Mean_Chim’ as type ‘float64’ (imputed)
Loading column ‘PASS_ID’ as type ‘int32’ (imputed)
Loading column ‘PASS_DUP’ as type ‘int32’ (imputed)
Loading column ‘PASS_SampleSwap’ as type ‘int32’ (imputed)
Loading column ‘PASS_PIHAT’ as type ‘int32’ (imputed)
Loading column ‘PASS_Sex’ as type ‘int32’ (imputed)
Loading column ‘PASS_Depth’ as type ‘int32’ (imputed)
Loading column ‘PASS_Median_FreeMix’ as type ‘int32’ (imputed)
Loading column ‘PASS_NRD’ as type ‘int32’ (imputed)
Loading column ‘PASS_HetHomAlt’ as type ‘int32’ (imputed)
Loading column ‘PASS_CHIM’ as type ‘int32’ (imputed)

[Stage 2:> (0 + 8) / 1000]
[Stage 2:> (8 + 8) / 1000]
[Stage 2:> (16 + 8) / 1000]
[Stage 2:=> (25 + 8) / 1000]
[Stage 2:=> (33 + 8) / 1000]
[Stage 2:==> (41 + 8) / 1000]
[Stage 2:==> (47 + 8) / 1000]
[Stage 2:===> (57 + 8) / 1000]
[Stage 2:===> (65 + 8) / 1000]
[Stage 2:====> (73 + 8) / 1000]
[Stage 2:====> (81 + 8) / 1000]
[Stage 2:====> (89 + 8) / 1000]
[Stage 2:=====> (97 + 8) / 1000]
[Stage 2:=====> (106 + 8) / 1000]
[Stage 2:======> (114 + 8) / 1000]
[Stage 2:======> (123 + 8) / 1000]
[Stage 2:=======> (132 + 8) / 1000]
[Stage 2:=======> (140 + 8) / 1000]
[Stage 2:=======> (146 + 8) / 1000]
[Stage 2:========> (156 + 8) / 1000]
[Stage 2:========> (163 + 12) / 1000]
[Stage 2:=========> (171 + 12) / 1000]
[Stage 2:=========> (177 + 12) / 1000]
[Stage 2:=========> (187 + 16) / 1000]
[Stage 2:==========> (197 + 16) / 1000]
[Stage 2:==========> (205 + 16) / 1000]
[Stage 2:===========> (215 + 16) / 1000]
[Stage 2:===========> (223 + 20) / 1000]
[Stage 2:============> (231 + 20) / 1000]
[Stage 2:============> (240 + 20) / 1000]
[Stage 2:=============> (248 + 24) / 1000]
[Stage 2:=============> (257 + 24) / 1000]
[Stage 2:==============> (265 + 24) / 1000]
[Stage 2:==============> (273 + 24) / 1000]
[Stage 2:==============> (280 + 28) / 1000]
[Stage 2:===============> (289 + 40) / 1000]
[Stage 2:===============> (297 + 40) / 1000]
[Stage 2:================> (305 + 40) / 1000]
[Stage 2:================> (317 + 40) / 1000]
[Stage 2:=================> (327 + 44) / 1000]
[Stage 2:==================> (342 + 60) / 1000]
[Stage 2:==================> (353 + 72) / 1000]
[Stage 2:===================> (367 + 72) / 1000]
[Stage 2:====================> (382 + 72) / 1000]
[Stage 2:====================> (395 + 72) / 1000]
[Stage 2:=====================> (407 + 96) / 1000]
[Stage 2:=====================> (420 + 128) / 1000]
[Stage 2:======================> (435 + 136) / 1000]
[Stage 2:=======================> (447 + 136) / 1000]
[Stage 2:========================> (472 + 137) / 1000]
[Stage 2:=========================> (490 + 160) / 1000]
[Stage 2:==========================> (509 + 196) / 1000]
[Stage 2:===========================> (537 + 220) / 1000]
[Stage 2:============================> (548 + 260) / 1000]
[Stage 2:==============================> (584 + 272) / 1000]
[Stage 2:===============================> (604 + 328) / 1000]
[Stage 2:================================> (630 + 348) / 1000]
[Stage 2:==================================> (671 + 329) / 1000]
[Stage 2:===================================> (688 + 312) / 1000]
[Stage 2:====================================> (693 + 307) / 1000]
[Stage 2:=====================================> (712 + 288) / 1000]
[Stage 2:=====================================> (716 + 284) / 1000]
[Stage 2:=====================================> (727 + 273) / 1000]
[Stage 2:======================================> (746 + 254) / 1000]
[Stage 2:=======================================> (763 + 237) / 1000]
[Stage 2:========================================> (784 + 216) / 1000]
[Stage 2:=========================================> (792 + 208) / 1000]
[Stage 2:==========================================> (812 + 188) / 1000]
[Stage 2:===========================================> (832 + 168) / 1000]
[Stage 2:============================================> (848 + 152) / 1000]
[Stage 2:=============================================> (872 + 128) / 1000]
[Stage 2:================================================> (913 + 87) / 1000]
[Stage 2:==================================================> (944 + 56) / 1000]
[Stage 2:====================================================>(984 + 16) / 1000]2019-09-02 16:04:04 Hail: INFO: Coerced sorted dataset

[Stage 3:> (0 + 476) / 1000]
[Stage 3:> (4 + 476) / 1000]
[Stage 3:> (8 + 476) / 1000]
[Stage 3:> (12 + 476) / 1000]
[Stage 3:=> (22 + 476) / 1000]
[Stage 3:=> (35 + 476) / 1000]
[Stage 3:==> (43 + 476) / 1000]
[Stage 3:==> (50 + 476) / 1000]
[Stage 3:===> (65 + 476) / 1000]
[Stage 3:=====> (102 + 476) / 1000]
[Stage 3:=======> (152 + 476) / 1000]
[Stage 3:==========> (195 + 476) / 1000]
[Stage 3:=============> (265 + 476) / 1000]
[Stage 3:================> (319 + 476) / 1000]
[Stage 3:==================> (364 + 476) / 1000]
[Stage 3:===================> (382 + 476) / 1000]
[Stage 3:====================> (385 + 476) / 1000]
[Stage 3:====================> (391 + 476) / 1000]
[Stage 3:====================> (392 + 476) / 1000]
[Stage 3:====================> (393 + 476) / 1000]
[Stage 3:====================> (396 + 476) / 1000]
[Stage 3:=====================> (409 + 476) / 1000]
[Stage 3:======================> (427 + 476) / 1000]
[Stage 3:=======================> (457 + 476) / 1000]
[Stage 3:=========================> (497 + 476) / 1000]
[Stage 3:============================> (540 + 460) / 1000]
[Stage 3:=============================> (570 + 430) / 1000]
[Stage 3:===============================> (614 + 386) / 1000]
[Stage 3:====================================> (700 + 300) / 1000]
[Stage 3:========================================> (770 + 230) / 1000]
[Stage 3:==========================================> (825 + 175) / 1000]
[Stage 3:============================================> (853 + 147) / 1000]
[Stage 3:=============================================> (867 + 133) / 1000]
[Stage 3:==============================================> (894 + 106) / 1000]
[Stage 3:================================================> (921 + 79) / 1000]
[Stage 3:==================================================> (944 + 56) / 1000]
[Stage 3:==================================================> (950 + 50) / 1000]
[Stage 3:==================================================> (959 + 41) / 1000]
[Stage 3:===================================================> (973 + 27) / 1000]
[Stage 3:====================================================>(983 + 17) / 1000]
[Stage 3:====================================================>(988 + 12) / 1000]
[Stage 3:=====================================================>(998 + 2) / 1000]2019-09-02 16:04:28 Hail: INFO: wrote matrix table with 26817 rows and 12354 columns in 1000 partitions to gs://interval-wgs/checkpoints/chr20-mini-fragment.mt
** Read mt completed after 0:00:41.153290

[Stage 4:> (0 + 1) / 1]
[Stage 5:> (0 + 476) / 1000]
[Stage 5:> (2 + 476) / 1000]
[Stage 5:> (10 + 476) / 1000]
[Stage 5:=> (19 + 476) / 1000]
[Stage 5:=> (24 + 476) / 1000]
[Stage 5:=> (33 + 476) / 1000]
[Stage 5:==> (40 + 476) / 1000]
[Stage 5:===> (58 + 476) / 1000]
[Stage 5:=====> (105 + 476) / 1000]
[Stage 5:========> (154 + 476) / 1000]
[Stage 5:===========> (214 + 476) / 1000]
[Stage 5:=============> (254 + 476) / 1000]
[Stage 5:==============> (275 + 477) / 1000]
[Stage 5:=================> (333 + 476) / 1000]
[Stage 5:==================> (362 + 476) / 1000]
[Stage 5:=====================> (408 + 476) / 1000]
[Stage 5:======================> (429 + 476) / 1000]
[Stage 5:=======================> (453 + 476) / 1000]
[
5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]
[Stage 5:=====================================================>(999 + 1) / 1000]

the log file we usually mean is the Hail log file, which is written to a location set in hl.init() (or automatically if you don’t use hl.init()) and printed after the hail ascii art on startup.

Hi, sorry I didn’t have access to that but I have it now. I have uploaded it here hail.log
Thank you,
Pavlos

There is a warning on the hail help docs that says for the mt.write(): Do not write to a path that is being read from in the same computation.
what is the definition of computation? is a bash script consider a computation or a single hail command? Could this be related to these problems we are seeing? Thanks.

I don’t think that’s what’s going on here, since that produces other errors (FileNotFoundException).

The unit within which it’s not OK to read/write the same file is a lazy Hail pipeline, broken up by any writes/reads or checkpoints. For instance:

mt = hl.read_matrix_table(file1)
mt = mt.filter_rows(...)
mt = hl.sample_qc(mt)

# NOT OK:
mt.write(file1)
mt = hl.read_matrix_table(file1)
mt = mt.filter_rows(...)
mt = hl.sample_qc(mt)
mt = mt.checkpoint(file2)

# OK:
mt.write(file1)

@pavlos, it appears that Stage 5 starts at

2019-09-03 12:18:39 YarnScheduler: INFO: Adding task set 5.0 with 1000 tasks

and finish most tasks (997 of 1000) at

2019-09-03 12:18:53 TaskSetManager: INFO: Finished task 508.0 in stage 5.0 (TID 2518) in 8317 ms on hail-w-32.c.ibd-interval.internal (executor 74) (997/1000)

Which is about 13 seconds.

Then one minute elapses and many of the executors get removed because they’re idle, including executor 31 and 43 which should be running the missing three tasks:

# grep '102.0 in stage 5.0' hail.log                                                   
2019-09-03 12:18:39 TaskSetManager: INFO: Starting task 102.0 in stage 5.0 (TID 2112, hail-w-31.c.ibd-interval.internal, executor 91, partition 102, PROCESS_LOCAL, 8309 bytes)
# grep '211.0 in stage 5.0' hail.log 
2019-09-03 12:18:40 TaskSetManager: INFO: Starting task 211.0 in stage 5.0 (TID 2221, hail-w-31.c.ibd-interval.internal, executor 91, partition 211, PROCESS_LOCAL, 8308 bytes)
# grep '300.0 in stage 5.0' hail.log 
2019-09-03 12:18:40 TaskSetManager: INFO: Starting task 300.0 in stage 5.0 (TID 2310, hail-w-43.c.ibd-interval.internal, executor 107, partition 300, PROCESS_LOCAL, 8308 bytes)

I can’t really add any more information here. Spark appears to be hanging on same tasks for unknown reasons. I don’t understand why Spark thinks executors are idle when they have work scheduled on them.

Can you run other Spark jobs successfully? Are there any unusual log statements from those executors? Did something happen to the nodes hosting those executors? Does anything exist at s3a://interval-wgs-pa10/checkpoints/chr20-mini-split-multi.mt? Did your nodes run out of local disk space?

To be clear, even though you commented out the mt = hl.read_matrix_table(...) you did read “s3a://interval-wgs-pa10/checkpoints/chr20-mini-fragment.mt” and assigned it to mt before running the split multi line, correct? Which is to say you ran:

mt = hl.read_matrix_table('s3a://interval-wgs-pa10/checkpoints/chr20-mini-fragment.mt')
mt_split = hl.split_multi_hts(mt, keep_star=False, left_aligned=True)
mt_split.write('s3a://interval-wgs-pa10/checkpoints/chr20-mini-split-multi.mt', stage_locally=True, overwrite=True)

Thank you for your replies. There are files listed in the matrixtable folder for the chr20-mini-split-multi.mt. So hail is writing to the bucket.
We can run other spark jobs successfully but I will try to find out more about our settings.

Below is the exact code for the log file I have sent you for completion. It’s similar to the one in the original post, only difference is google cloud and not s3. Just wanted to clarify it.

   mt = hl.import_vcf("gs://interval-wgs/chr20-vcf/chr20_10495710-11001242_all.g.vcf.bgz", array_elements_required=False, min_partitions=1000, force_bgz= True)
        mt.write('gs://interval-wgs/checkpoints/chr20-mini-fragment.mt', overwrite= True)
        mt = hl.read_matrix_table('gs://interval-wgs/checkpoints/chr20-mini-fragment.mt')
        #####################################################################
    with Timer('Split multialleles'):
        mt_split1=hl.split_multi_hts(mt, keep_star=False, left_aligned=True)

    with Timer('Write mt in gs'):
        mt_split1.write('gs://interval-wgs/matrixtables/chr20-mini-split-multi.mt', overwrite=True)

    with Timer('read split matrixtable'):
        mt_split = hl.read_matrix_table('gs://interval-wgs/matrixtables/chr20-mini-split-multi.mt')

Looks right. I don’t understand what the Spark cluster is doing. You’ll need to get more information about the hanging executors. AFAICT, your job is taking about 13 seconds for the vast majority of the work

Thanks again for your help Dan and Tim.
Following your comments about spark and the executors becoming idle even when they had tasks, I have searched around the spark documentation and forums and the code now seems to execute till the end after I change the property of dynamic allocation to false. ``spark.dynamicAllocation.enabled=false```.

1 Like

weird…