Google cloud speed up

Hello,
I have been running a python hail script on chromosome 20 that is doing sample qc, variant_qc and some basic filtering steps on google cloud using 64 workers. It is exporting tables and writing out matrixtables in various checkpoints. The script completed in around 5 hours.

When I doubled the number of workers to 128, the process did not speed up but still took 5 hours. Why do you think doubling the number of workers did not improve running time with the same dataset?
What are the parameters that would help speed up the process especially as we ar enow scaling to chromosome 1 and whole genome eventually?

This is my current configuration:

–image-version=1.4-debian9 
–properties=spark:spark.driver.maxResultSize=0,spark:spark.task.maxFailures=20,spark:spark.kryoserializer.buffer.max=1g,spark:spark.driver.extraJavaOptions=-Xss4M,spark:spark.executor.extraJavaOptions=-Xss4M,hdfs:dfs.replication=1,dataproc:dataproc.logging.stackdriver.enable=false,dataproc:dataproc.monitoring.stackdriver.enable=false,spark:spark.driver.memory=41g 
–initialization-actions=gs://hail-common/hailctl/dataproc/0.2.20/init_notebook.py 
–metadata=^|||^WHEEL=gs://hail-common/hailctl/dataproc/0.2.20/hail-0.2.20-py3-none-any.whl|||PKGS=aiohttp|bokeh>1.1,<1.3|decorator<5|gcsfs==0.2.1|hurry.filesize==0.9|ipykernel<5|nest_asyncio|numpy<2|pandas>0.22,<0.24|parsimonious<0.9|PyJWT|python-json-logger==0.1.11|requests>=2.21.0,<2.21.1|scipy>1.2,<1.4|tabulate==0.8.3|PyYAML 
–master-machine-type=n1-highmem-8 
–master-boot-disk-size=100GB 
–num-master-local-ssds=0 
–num-preemptible-workers=0 
–num-worker-local-ssds=0 
–num-workers=64 
–preemptible-worker-boot-disk-size=40GB 
–worker-boot-disk-size=40 
–worker-machine-type=n1-standard-8 
–zone=europe-west2-a 
–initialization-action-timeout=20m 
–labels=creator=pa10_sanger_ac_uk```

How many partitions do you see in the Spark UI or Spark progress bar? If you have twice as many workers but too few partitions, many workers will stay idle.

Do you see any messages about “ordering unsorted dataset”.

Hi dan, yes I can see messages about “ordering unsorted dataset”.
I can now see improvement on the speed since creating the google hail cluster from my laptop instead of the institute’s HPC which is having independent issue that might have caused this as well.
My script is currently working OK and can finish QC analysis for chr2,chr3, chr4 within 1hour each.
But when I try to run chr1, the process is again stuck at the first checkpoint:

 mt = hl.read_matrix_table(f"{BUCKET}/{CHROMOSOME}.mt")
    #mt = hl.read_matrix_table('gs://interval-wgs/checkpoints/chr20-mini-fragment.mt')
    ####################################################################

    print('Split multialleles')
    mt_split1=hl.split_multi_hts(mt, keep_star=False)

    print('checkpoint split matrixtable')
    mt_split1 = mt_split1.checkpoint( f"{BUCKET}/matrixtables/{CHROMOSOME}-split-multi.mt", overwrite= True, stage_locally= True)

Is there a way to optimise the checkpoint command for larger datasets? I believe it’s a scaling issue here. What are the _partitions or _codec_spec arguments for matrixtable.write() I can see in the hail help? I can’t find a lot of info about them on hail hep. Would splitting the matrixtable in smaller chunks avoid this issue of getting stuck at the checkpoint?

Can you describe how it is “stuck”? How many Spark partitions have completed? How many are left? What is the median task time? (These are all available at the spark ui: hailctl dataproc connect NAME spark-ui).

If you’re using GCP and writing to a BUCKET that is a Google Cloud Storage (GCS) bucket, then don’t use stage_locally. That option is primarily useful for long running pipelines writing to Amazon S3, which does not play well with long running writes.

What are the symptoms of the scaling issue? write is naively parallelizable. If you want more parallelism, you probably should go back to the source of the mt and recreate it with more partitions.

Aside:
Hail does data sharding for you, I recommend against manually sharding your data by chromosome.

When I say stuck I mean the process of writing out to the matrixtable never completes although I can see the matrixtable files on google storage. The output from hail looks like this:

[Stage 5:======================================================>(794 + 6) / 800]

[Stage 5:======================================================>(795 + 5) / 800]

[Stage 5:======================================================>(796 + 4) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

[Stage 5:======================================================>(797 + 3) / 800]

It stays in that stage for hours until I kill the job. In the hail log I can see the spark clusters becoming idle one by one and not continuing without any error.

Thank you for the explanation for stage_locally.

I was not using it in my previous trial of the same script and I had the same error. I decided to use it in the hope that it will help but it is good to know that I should not use it for google storage.

If I understand you correctly, you are suggesting I should repartition the mt with more partitions or when im importing the vcfs?

Can you elaborate a bit more on your last aside comment please as it might help us ? Our vcf files are split by chromosome already. So we have a matrixtable per chromosome but also a wholegenome matrixtable. I was testing our scripts working
with smaller chromosomes starting with chr20,chr2,chr3 and they are working fine. Because of the issue with chr1 I have been reluctant to use the wholegenome chromosome. Is clever repartitioning the secret behind using whole genome?

Using more partitions will not resolve the “stuck” state you’re describing. If you are unhappy with the wall-clock-time, you could try using more partitions. Let’s ignore this for now.

You can import all your VCFs into one MT with:

import hail as hl
mt = hl.import_vcf('gs://path/to/vcf/chr*.vcf.bgz')

If you want to test just one chromosome use the following. Hail is smart enough to not read the data from other chromosomes.

mt = mt.filter_rows(mt.locus.contig == "22")

We need more information to understand the stuck issue you’re describing. Can you open the Spark UI:

hailctl dataproc connect CLUSTER_NAME spark-ui

Do you see an “Active” jobs? Does the active job have active tasks? Have any tasks failed? What is the error message and stack trace for the failed tasks?

If your cluster is going idle, then no work is being done. We have sometimes observed Apache Spark inexplicably not processing the last few partitions. You might investigate “Spark speculative execution” and see if it addresses the issues you’re having with Spark.

Thank you very much @danking for the detailed explanation.This is proving very useful to us.

I am running my hail cluster now and looking at the spark-UI of the cluster. The code is now again in the ‘stuck’ stage.
My application is still active and running.
There is 1 active stage on the cluster.
798/200 tasks succeeded -2 running.
No failed tasks. There are 2 active tasks in this state.
This is the stderr from one of the active executors tasks:

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
19/09/18 08:46:16 INFO org.apache.hadoop.io.compress.zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
19/09/18 08:46:16 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/09/18 08:46:16 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/09/18 08:46:16 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/09/18 08:46:16 INFO org.apache.hadoop.io.compress.CodecPool: Got brand-new decompressor [.gz]
19/09/18 08:47:14 INFO root: Index reader cache queries: 14
19/09/18 08:47:14 INFO root: Index reader cache hit rate: 0.5714285714285714
19/09/18 08:47:16 INFO root: Index reader cache queries: 14
19/09/18 08:47:16 INFO root: Index reader cache hit rate: 0.5714285714285714
19/09/18 08:47:16 INFO root: Index reader cache queries: 14
19/09/18 08:47:16 INFO root: Index reader cache hit rate: 0.5714285714285714 

There isn’t any clear error info in the stderr of the 2 active tasks for me.

Hi Pavlos,
What are the task IDs for the hanging tasks?

It’s possible that there are two partitions with lots of high-allele-count multiallelics that take much longer to split.

Hi Tim and @danking,
to give you an update:
I have tried Dan’s suggestion with the spark property of spark.speculation = true
hailctl dataproc submit “${CLUSTER}” --properties spark.speculation=true
and it solved the issue and the script for chr1 now completes without any tasks becoming stuck.
I am not clear why spark would not finish the last couple of tasks without this flag being set to true but it seems to be working now.
Many thanks for your help again.
Pavlos

Woohoo!

I am not clear why spark would not finish the last couple of tasks without this flag being set to true but it seems to be working now.

We are also not sure :confused:

Spark seems to sometimes get stuck with jobs in zombie states, and speculation means that the Spark scheduler will use idle workers toward the end of a stage to execute tasks in replicate, and accept the results of whichever replicate finishes first.

1 Like