Speed/streaming problems with Hail 0.2

Hi,

I’m having difficulty performing a standard logistic regression GWAS on UK Biobank data, which I was previously able to run in ~3 hours with Hail 0.1. When I run a logistic regression GWAS on only two chromosomes using the code below, it rapidly progresses to be almost complete in 30 min, with the progress bar showing:
[Stage 1:===================================================>(2254 + 23) / 2277]

However, my output then stalls and won’t progress. When I shut it down at 24 hours to avoid incurring additional charges, I have the following output:

WARNING: Job terminated, but output did not finish streaming.

ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [dd6f4cc280714d43b7ed9d6e81e3df6e] entered state [ERROR] while waiting for [DONE].

My code and full output are copied below:

Initialization Code:

gcloud dataproc clusters create cluster-ce1 --zone us-central1-f --master-machine-type n1-highmem-16 --master-boot-disk-size 100 --num-workers 150 --num-preemptible-workers 50 --worker-machine-type n1-highmem-8 --worker-boot-disk-size 100 --image-version=1.2-deb9 --project ukbb-analyses --metadata=JAR=gs://hail-common/builds/0.2/jars/hail-0.2-2d6ba260c70c-Spark-2.2.0.jar,ZIP=gs://hail-common/builds/0.2/python/hail-0.2-2d6ba260c70c.zip,MINICONDA_VERSION=4.4.10 --properties “spark:spark.driver.extraJavaOptions=-Xss4M,spark:spark.executor.extraJavaOptions=-Xss4M,spark:spark.driver.memory=90g,spark:spark.driver.maxResultSize=30g,spark:spark.task.maxFailures=20,spark:spark.kryoserializer.buffer.max=1g,hdfs:dfs.replication=1” --initialization-actions=gs://dataproc-initialization-actions/conda/bootstrap-conda.sh,gs://ukbb_v2/projects/mchaffin/exome_annotation/init_notebook.py

Submit code:

gcloud dataproc jobs submit pyspark --cluster=cluster-ce1 /Users/cemdin/Documents/Broad/Liver/cirrhosisgwas/hail/cirrhosis.hail.py

Job code:

import hail as hc
hc.init()

mt = hc.import_bgen(‘gs://fc-7d5088b4-7673-45b5-95c2-17ae00a04183/imputed/ukb_imp_chr[20-21]_v3.bgen’, sample_file=‘gs://ukbb_v2/data/ukb7089_imp_chr3_v3_s487395.sample’,entry_fields=[‘GT’, ‘GP’])

mt = hc.variant_qc(mt)

mt = mt.filter_rows((mt.variant_qc.AF[1] > 0.001) & (mt.variant_qc.AF[1] < 0.999))

pheno_kt = hc.import_table(‘gs://ukbb_v2/projects/cemdin/cirrhosis/cirrhosis.tsv’,types={‘cirrhosis’:hc.tint32,‘anyld’:hc.tint32,
‘nafld’:hc.tint32,‘age’:hc.tint,‘gender’:hc.tint,‘array’:hc.tint,‘pc1’:hc.tfloat64,‘pc2’:hc.tfloat64,‘pc3’:hc.tfloat64,‘pc4’:hc.tfloat64,‘pc5’:hc.tfloat64},delimiter="\t").key_by(‘eid’)
pheno_kt.describe

mt = mt.annotate_cols(pheno = pheno_kt[mt.s])

gwas = hc.logistic_regression_rows(test=‘wald’,
y=mt.pheno.cirrhosis,
x=mt.GT.n_alt_alleles(),
covariates=[1, mt.pheno.age,mt.pheno.gender, mt.pheno.array, mt.pheno.pc1,mt.pheno.pc2,mt.pheno.pc3,mt.pheno.pc4,mt.pheno.pc5])

gwas.export(‘gs://ukbb_v2/projects/cemdin/cirrhosis/cirrhosisgwas.chr.20.21.tsv’)

Output:

Waiting for job output…
Running on Apache Spark version 2.2.3
SparkUI available at http://10.128.0.11:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version 0.2.11-2d6ba260c70c
LOGGING: writing to /tmp/dd6f4cc280714d43b7ed9d6e81e3df6e/hail-20190318-1841-0.2.11-2d6ba260c70c.log
2019-03-18 18:42:20 Hail: INFO: Number of BGEN files parsed: 2
2019-03-18 18:42:20 Hail: INFO: Number of samples in BGEN files: 487409
2019-03-18 18:42:20 Hail: INFO: Number of variants across all BGEN files: 11469565
2019-03-18 18:42:38 Hail: INFO: Reading table with no type imputation
Loading column ‘eid’ as type ‘str’ (type not specified)
Loading column ‘cirrhosis’ as type ‘int32’ (user-specified)
Loading column ‘nafld’ as type ‘int32’ (user-specified)
Loading column ‘anyld’ as type ‘int32’ (user-specified)
Loading column ‘age’ as type ‘int32’ (user-specified)
Loading column ‘gender’ as type ‘int32’ (user-specified)
Loading column ‘array’ as type ‘int32’ (user-specified)
Loading column ‘pc1’ as type ‘float64’ (user-specified)
Loading column ‘pc2’ as type ‘float64’ (user-specified)
Loading column ‘pc3’ as type ‘float64’ (user-specified)
Loading column ‘pc4’ as type ‘float64’ (user-specified)
Loading column ‘pc5’ as type ‘float64’ (user-specified)

[Stage 0:==================================================> (22 + 3) / 25]2019-03-18 18:43:30 Hail: WARN: 82119 of 487409 samples have a missing phenotype or covariate.
2019-03-18 18:43:35 Hail: INFO: logistic_regression_rows: running wald on 405290 samples for response variable y,
with input variable x, and 9 additional covariates…
[Stage 1:=========================> (1177 + 1100) / 2277]

[Stage 1:===================================================>(2254 + 23) / 2277]

[Stage 1:===================================================>(2254 + 23) / 2277]
[Stage 1:===================================================>(2254 + 23) / 2277]WARNING: Job terminated, but output did not finish streaming.
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [dd6f4cc280714d43b7ed9d6e81e3df6e] entered state [ERROR] while waiting for [DONE].

We’ve seen related things before (Sek’s group finds lots of problems!). I believe that Mary fixed the problem by using Spark speculation.

Can you try starting your cluster with the following option:

cluster start ... \ 
  --properties spark:spark.speculation=true

Seems to have done the trick!

Thanks

Hi,

Unfortunately the above message was based on a single chromosome. When I tried to run it on the full set of chromosomes, it again progressed rapidly to almost finish before getting stalled at the end. I shut it down after 16 hours.

[Stage 1:================================================>(18255 + 117) / 18257]WARNING: Job terminated, but output did not finish streaming.

Any other recommendations?

This time I used the following code:

gcloud dataproc clusters create cluster-ce1 --zone us-central1-f --master-machine-type n1-highmem-16 --master-boot-disk-size 100 --num-workers 150 --num-preemptible-workers 50 --worker-machine-type n1-highmem-8 --worker-boot-disk-size 100 --image-version=1.2-deb9 --project ukbb-analyses --metadata=JAR=gs://hail-common/builds/0.2/jars/hail-0.2-2d6ba260c70c-Spark-2.2.0.jar,ZIP=gs://hail-common/builds/0.2/python/hail-0.2-2d6ba260c70c.zip,MINICONDA_VERSION=4.4.10 --properties “spark:spark.driver.extraJavaOptions=-Xss4M,spark:spark.executor.extraJavaOptions=-Xss4M,spark:spark.driver.memory=90g,spark:spark.driver.maxResultSize=30g,spark:spark.speculation=true,spark:spark.task.maxFailures=20,spark:spark.kryoserializer.buffer.max=1g,hdfs:dfs.replication=1” --initialization-actions=gs://dataproc-initialization-actions/conda/bootstrap-conda.sh,gs://ukbb_v2/projects/mchaffin/exome_annotation/init_notebook.py

gcloud dataproc jobs submit pyspark --cluster=cluster-ce1 /Users/cemdin/Documents/Broad/Liver/cirrhosisgwas/hail/cirrhosis.hail.py

import hail as hc

hc.init()

mt = hc.import_bgen(‘gs://fc-7d5088b4-7673-45b5-95c2-17ae00a04183/imputed/ukb_imp_chr[1-9]*_v3.bgen’, sample_file=‘gs://ukbb_v2/data/ukb7089_imp_chr3_v3_s487395.sample’,entry_fields=[‘GT’, ‘GP’])

mt = hc.variant_qc(mt)

mt = mt.filter_rows((mt.variant_qc.AF[1] > 0.001) & (mt.variant_qc.AF[1] < 0.999))

pheno_kt = hc.import_table(‘gs://ukbb_v2/projects/cemdin/cirrhosis/cirrhosis.tsv’,types={‘cirrhosis’:hc.tint32,‘anyld’:hc.tint32,

'nafld':hc.tint32,'age':hc.tint,'gender':hc.tint,'array':hc.tint,'pc1':hc.tfloat64,'pc2':hc.tfloat64,'pc3':hc.tfloat64,'pc4':hc.tfloat64,'pc5':hc.tfloat64},delimiter="\t").key_by('eid')

pheno_kt.describe

mt = mt.annotate_cols(pheno = pheno_kt[mt.s])

gwas = hc.logistic_regression_rows(test=‘wald’,

							 y=mt.pheno.cirrhosis,

x=mt.GT.n_alt_alleles(),

covariates=[1, mt.pheno.age,mt.pheno.gender, mt.pheno.array, mt.pheno.pc1,mt.pheno.pc2,mt.pheno.pc3,mt.pheno.pc4,mt.pheno.pc5])

gwas.export('gs://ukbb_v2/projects/cemdin/cirrhosis/cirrhosisgwas.tsv’)

This is super weird. Completed + in-progress is greater than the number of total tasks!

It also looks like this warning is coming from Google Dataproc:
http://testcompany.info/google-cloud-sdk/lib/googlecloudsdk/api_lib/dataproc/util.py

I think it may be worth opening a support ticket here.

Also – does the file gs://ukbb_v2/projects/cemdin/cirrhosis/cirrhosisgwas.tsv exist?

Yes the file exists.

Submitting the spark.speculation and spark.speculation.quantile with the job itself appears to have solved the problem (or it just randomly started working.

Specifically, I’ve got it working using the following submit commands:

gcloud dataproc clusters create cluster-ce --zone us-central1-f --master-machine-type n1-highmem-16 --master-boot-disk-size 100 --num-workers 50 --num-preemptible-workers 150 --worker-machine-type n1-highmem-8 --worker-boot-disk-size 100 --image-version=1.2-deb9 --project ukbb-analyses --metadata=JAR=gs://hail-common/builds/0.2/jars/hail-0.2-2d6ba260c70c-Spark-2.2.0.jar,ZIP=gs://hail-common/builds/0.2/python/hail-0.2-2d6ba260c70c.zip,MINICONDA_VERSION=4.4.10 --properties “spark:spark.driver.extraJavaOptions=-Xss4M,spark:spark.executor.extraJavaOptions=-Xss4M,spark:spark.driver.memory=90g,spark:spark.driver.maxResultSize=30g,spark:spark.speculation=true,spark:spark.task.maxFailures=20,spark:spark.kryoserializer.buffer.max=1g,hdfs:dfs.replication=1” --initialization-actions=gs://dataproc-initialization-actions/conda/bootstrap-conda.sh,gs://ukbb_v2/projects/mchaffin/exome_annotation/init_notebook.py

gcloud dataproc jobs submit pyspark --cluster=cluster-ce /Users/cemdin/Documents/Broad/Liver/cirrhosisgwas/hail/cirrhosis.hail.py --properties spark.memory.fraction=0.33,spark.speculation=true,spark.speculation.quantile=0.95

Thank you for your help.

we may still want to submit a ticket. this seems bad.

It looks like the job may have finished successfully but the dataproc cleanup failed./