Large scale ingest

We are running hail on AWS cloud. Now we are running into memory limits while ingesting our VCF files to hail. It would be great if you could help us with the following questions:

  1. How much overhead is needed when writing a bunch of VCF files to a matrix table?

  2. Could you make a guess how much memory we need to request in order to ingest a file set of 3TB (chromosome 20 of our data set)?

  3. How much memory should we request to ingest the full set which is 140TB for now and will get larger as we will approach and exceed 100K? Does that even make sense to do this at once?

  4. Are there ways of ingesting not all VCF files at once?

  5. How are you handling hail ingests of large scale call sets? We would be very happy to hear about your experience and maybe get some direction and advice for an ingest at this scale?

Many Thanks,
Matthias

1 Like

Hi Matthias,
I’d like to better understand what you’re trying to do. Are you doing the following rough pipeline:

mt = hl.import_vcf([file1, file2, file3, ...]) # 140T of files
mt.write('s3://...')

A few questions:

How many files are there?
Where are you running out of memory? (driver vs executor)
Do you see any messages like INFO: ordering unsorted dataset with network shuffle?


You shouldn’t really need much much memory to import VCFs - Hail will just stream through them. But there could be something bad going on.

another question - these VCFs are already joint-called and split by genomic interval, right?

Thanks for your help.
We are following the pipeline you describe (import and mt.write).
Our last attempt was with 2 files (about 100GB each). The files are jointly called and split by genomic region. The set is 1009 files.
We do see the INFO message you describe.
Now we moved to hail 0.2.11 and look at the following error message that we do not quite understand:

SparkException: Job 2 cancelled because SparkContext was shut down

Java stack trace:
org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:972)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:970)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:970)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2284)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2191)
at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:196)
at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1048)
at is.hail.rvd.RVD.coalesce(RVD.scala:204)
at is.hail.expr.ir.TableRepartition.execute(TableIR.scala:364)
at is.hail.expr.ir.TableRename.execute(TableIR.scala:1481)
at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:764)
at is.hail.expr.ir.CastTableToMatrix.execute(MatrixIR.scala:1696)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:763)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:86)
at is.hail.expr.ir.CompileAndEvaluate$.apply(CompileAndEvaluate.scala:37)
at is.hail.expr.ir.CompileAndEvaluate$.evaluateToJSON(CompileAndEvaluate.scala:14)
at is.hail.expr.ir.CompileAndEvaluate.evaluateToJSON(CompileAndEvaluate.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Hail version: 0.2.11-cf54f08305d1
Error summary: SparkException: Job 2 cancelled because SparkContext was shut down
Traceback (most recent call last):
File “</usr/local/lib/python3.6/site-packages/decorator.py:decorator-gen-920>”, line 2, in write
File “/usr/local/lib/python3.6/site-packages/hail/typecheck/check.py”, line 561, in wrapper
return original_func(*args, **kwargs)
File “/usr/local/lib/python3.6/site-packages/hail/matrixtable.py”, line 2442, in write
Env.backend().execute(MatrixWrite(self._mir, writer))
File “/usr/local/lib/python3.6/site-packages/hail/backend/backend.py”, line 94, in execute
self._to_java_ir(ir)))
File “/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 1257, in call
answer, self.gateway_client, self.target_id, self.name)
File “/usr/local/lib/python3.6/site-packages/hail/utils/java.py”, line 227, in deco
‘Error summary: %s’ % (deepest, full, hail.version, deepest)) from None
hail.utils.java.FatalError: SparkException: Job 2 cancelled because SparkContext was shut down

Java stack trace:
org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:972)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:970)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:970)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2284)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2191)
at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:196)
at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1048)
at is.hail.rvd.RVD.coalesce(RVD.scala:204)
at is.hail.expr.ir.TableRepartition.execute(TableIR.scala:364)
at is.hail.expr.ir.TableRename.execute(TableIR.scala:1481)
at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:764)
at is.hail.expr.ir.CastTableToMatrix.execute(MatrixIR.scala:1696)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:763)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:86)
at is.hail.expr.ir.CompileAndEvaluate$.apply(CompileAndEvaluate.scala:37)
at is.hail.expr.ir.CompileAndEvaluate$.evaluateToJSON(CompileAndEvaluate.scala:14)
at is.hail.expr.ir.CompileAndEvaluate.evaluateToJSON(CompileAndEvaluate.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

Hail version: 0.2.11-cf54f08305d1
Error summary: SparkException: Job 2 cancelled because SparkContext was shut down

This error is probably a Spark OOM error - Spark is very unhelpful about the messages it gives.

If you’re seeing the message about “ordering unsorted dataset with network shuffle” on import_vcf, then it will never work on the scale of data you have. This message appears when the VCFs are not sorted by locus, and requires that we do a distributed sort, which has all sorts of problems in Spark.

I’m making some small changes now to make it possible to see where the keys were out of order.

Can you update to latest master and run again, cancelling the job when you see that message? The hail log file should contain information about the specific loci that were out of order now.

We sorted our input files and updated to latest master.
The ingest with the sorted files works now.
Thanks a lot! This was very helpful for us.

Great! Let us know how the 140T ingest goes!