Export_vcf OutOfMemoryError: Java heap space despite --driver-memory 8g

Hi,

I’m trying a simple import from bed/bim/fam, followed by an export to vcf, and it fails because of Java heap space.

I followed what @danking had already posted, i.e. adding --driver-memory 8g, and this is what’s in my bashrc:

export SPARK_HOME=/usr/local/spark
export HAIL_HOME=/usr/local/hail
export PATH=$PATH:HAIL_HOME/bin/ export PYTHONPATH="{PYTHONPATH:+$PYTHONPATH:}$HAIL_HOME/build/distributions/hail-python.zip"
export PYTHONPATH=“$PYTHONPATH:$SPARK_HOME/python”
export PYTHONPATH=“$PYTHONPATH:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip”
export SPARK_CLASSPATH=$HAIL_HOME/build/libs/hail-all-spark.jar
export JAR_PATH=“$HAIL_HOME/build/libs/hail-all-spark.jar”
export PYSPARK_SUBMIT_ARGS=“–conf spark.driver.extraClassPath=$JAR_PATH --conf spark.executor.extraClassPath=$JAR_PATH --driver-memory 8g pyspark-shell”

Still, this is what happens:

Python 3.6.6 |Anaconda, Inc.| (default, Jun 28 2018, 17:14:51)
Type ‘copyright’, ‘credits’ or ‘license’ for more information
IPython 6.4.0 – An enhanced Interactive Python. Type ‘?’ for help.

In [1]: import hail as hl
…: import hail.expr.aggregators as agg
…: hl.init()
…:
…: ds = hl.import_plink(bed=“splithh.bed”,bim=“splithh.bim”,fam=“splithh.fam”, min_partitions=None, del
…: imiter=‘\\s+’, missing=‘NA’, quant_pheno=False, a2_reference=True, reference_genome=‘default’, contig_recoding={‘23’: ‘X’, ‘24’: ‘Y’, ‘25’: ‘X’, ‘26’: ‘MT’, ‘PAR1’: ‘X’, ‘PAR2’: ‘X’}, skip_invalid_loci=False)
…: hl.export_vcf(ds, ‘Hail_from_plink1.9_splithh.vcf.bgz’, parallel=‘header_per_shard’)
…:
…:
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Running on Apache Spark version 2.2.0
SparkUI available at http://192.168.0.60:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version devel-f5ba8f0e4580
NOTE: This is a beta version. Interfaces may change
during the beta period. We recommend pulling
the latest changes weekly.
2018-10-05 13:04:06 Hail: INFO: Found 6518 samples in fam file.
2018-10-05 13:04:06 Hail: INFO: Found 307201 variants in bim file.
[Stage 0:===> (1 + 14) / 15]2018-10-05 13:04:09 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-10-05 13:04:09 Hail: WARN: export_vcf found no row field ‘info’. Emitting no INFO fields.
[Stage 2:> (0 + 14) / 15]---------------------------------------------------------------------------
FatalError Traceback (most recent call last)

Hail version: devel-f5ba8f0e4580
Error summary: OutOfMemoryError: Java heap space

I also tried with --driver-memory 12g, same issue.
Any idea?

Can you try updating to latest build? Your version is 3 months old! If the issue persists in that case, we’ll debug further.

I realize that for other software, 3 months is pretty current, but for us that is an eternity :slight_smile:

No worries, apologies for the delay, I had to re-install quite a few things (it’s now very clean). Unfortunately, same issue:

Python 3.6.6 |Anaconda, Inc.| (default, Jun 28 2018, 17:14:51)
Type ‘copyright’, ‘credits’ or ‘license’ for more information
IPython 7.0.1 – An enhanced Interactive Python. Type ‘?’ for help.

In [1]: import hail as hl
…: import hail.expr.aggregators as agg
…: hl.init()
…:
…:
Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to “WARN”.
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Running on Apache Spark version 2.2.0
SparkUI available at http://192.168.0.83:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version devel-05870d9d222a
NOTE: This is a beta version. Interfaces may change
during the beta period. We recommend pulling
the latest changes weekly.
LOGGING: writing to /root/hail-20181008-0957-devel-05870d9d222a.log

In [6]: ds = hl.import_plink(bed=“plink1.9_splithh.bed”,bim=“plink1.9_splithh.bim”,fam=“plink1.9_splithh.fam”, min_partitions=None, delimiter=‘\\s+’, missing=‘NA’, quant_pheno=False, a2_reference=True, reference_genome=‘default’, contig_recoding={‘23’: ‘X’, ‘24’: ‘Y’, ‘25’: ‘X’, ‘26’
…: : ‘MT’, ‘PAR1’: ‘X’, ‘PAR2’: ‘X’}, skip_invalid_loci=False)
…:
2018-10-08 10:09:32 Hail: INFO: Found 6518 samples in fam file.
2018-10-08 10:09:32 Hail: INFO: Found 307201 variants in bim file.
[Stage 0:=====================================================> (14 + 1) / 15]2018-10-08 10:09:47 Hail: INFO: Ordering unsorted dataset with network shuffle

In [7]: hl.export_vcf(ds, ‘plink1.9_splithh.vcf.bgz’, parallel=‘header_per_shard’)
…:
2018-10-08 10:10:04 Hail: WARN: export_vcf found no row field ‘info’. Emitting no INFO fields.
[Stage 2:> (0 + 14) / 15]---------------------------------------------------------------------------
FatalError Traceback (most recent call last)
in
----> 1 hl.export_vcf(ds, ‘/root/VTE_pre-imputation/Hail_from_plink1.9_splithh.vcf.bgz’, parallel=‘header_per_shard’)

in export_vcf(dataset, output, append_to_header, parallel, metadata)

/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
558 def wrapper(original_func, *args, **kwargs):
559 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
→ 560 return original_func(*args, **kwargs)
561
562 return wrapper

/usr/local/hail/build/distributions/hail-python.zip/hail/methods/impex.py in export_vcf(dataset, output, append_to_header, parallel, metadata)
420 Env.hail().io.vcf.ExportVCF.apply(dataset._jvds, output, joption(append_to_header),
421 Env.hail().utils.ExportType.getExportType(parallel),
→ 422 joption(typ._convert_to_j(metadata)))
423
424

/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in call(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
→ 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:

/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
208 raise FatalError(‘%s\n\nJava stack trace:\n%s\n’
209 ‘Hail version: %s\n’
→ 210 ‘Error summary: %s’ % (deepest, full, hail.version, deepest)) from None
211 except pyspark.sql.utils.CapturedException as e:
212 raise FatalError(‘%s\n\nJava stack trace:\n%s\n’

FatalError: OutOfMemoryError: Java heap space

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 2.0 failed 1 times, most recent failure: Lost task 6.0 in stage 2.0 (TID 36, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:102)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2075)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply$mcV$sp(PairRDDFunctions.scala:1016)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1015)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:973)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:971)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply$mcV$sp(RDD.scala:1507)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1495)
at is.hail.utils.richUtils.RichRDD$.writeTable$extension(RichRDD.scala:66)
at is.hail.io.vcf.ExportVCF$.apply(ExportVCF.scala:424)
at is.hail.io.vcf.ExportVCF.apply(ExportVCF.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:102)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Hail version: devel-05870d9d222a
Error summary: OutOfMemoryError: Java heap space

My .bashrc:

export SPARK_HOME=/usr/local/spark
export HAIL_HOME=/usr/local/hail
export PYTHONPATH=“${PYTHONPATH:+$PYTHONPATH:}$HAIL_HOME/build/distributions/hail-python.zip”
export PYTHONPATH=“$PYTHONPATH:$SPARK_HOME/python”
export PYTHONPATH=“$PYTHONPATH:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip”

PYSPARK_SUBMIT_ARGS is used by ipython and jupyter

export PYSPARK_SUBMIT_ARGS="
–jars $HAIL_HOME/build/libs/hail-all-spark.jar
–conf spark.driver.extraClassPath="$HAIL_HOME/build/libs/hail-all-spark.jar"
–conf spark.executor.extraClassPath=./hail-all-spark.jar
–conf spark.serializer=org.apache.spark.serializer.KryoSerializer
–conf spark.kryo.registrator=is.hail.kryo.HailKryoRegistrator
–driver-memory 8g pyspark-shell"

@hhx037, I believe you have successfully increased the driver memory. However, this operation is not limited by driver memory. It is limited by executor memory. The plink file you’re using is apparently not in reference genome order (probably because 23 and 25 are both X chromosome, but are not contiguous). Hail requires that genetics datasets are in reference genome sorted order (i.e. 1,2,3,…,22,X,Y). Therefore, Hail is using a “Spark shuffle” to sort that plink file. This “shuffle” operation is apparently using a lot of memory. I do not understand why.

Can you try increasing executor memory by adding --executor-memory 4g to your PYSPARK_SUBMIT_ARGS (it must appear before pyspark-shell). If your executors have more than 4 GB available, try using even more memory :slight_smile:.

Hi @danking

indeed I am using the contig_recoding={'23': 'X', '24': 'Y', '25': 'X', '26': 'MT', 'PAR1': 'X', 'PAR2': 'X'} option because of the way plink handles X and PAR regions.
I’ve tried with up to --executor-memory 12g but it still crashes unfortunately.

Is there a way for me to kinda tell Hail to sort the matrix before doing the export?

There’s really only three steps, read, shuffle, write, so there’s no other spot to do the sorting.

Can you share the hail.log file from one of these runs? It’s usually written to the working directory of the executor process. Can you also share the size of the plink file as displayed by du -sh /path/to/the/file?

I’ve just sent you a link to download the log file.
The bed file I’m trying to convert is 478M

Sorry to re-open this, but I’m still facing some memory issues during QC and GWAS, so I’m wondering if I’m submitting the spark job the wrong way, or if I need to build a VM with more memory per core (I’ve got around 9GB/core).

My submit command:

nohup spark-submit --jars "$HAIL_HOME/build/libs/hail-all-spark.jar" --py-files "$HAIL_HOME/build/distributions/hail-python.zip" --conf spark.driver.extraClassPath="$HAIL_HOME/build/libs/hail-all-spark.jar" --conf spark.executor.extraClassPath=./hail-all-spark.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryo.registrator=is.hail.kryo.HailKryoRegistrator --executor-memory 8g --driver-memory 8g UK10K.py &amp;

error message:

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/01 18:40:20 WARN Utils: Your hostname, deloukas-hail resolves to a loopback address: 127.0.0.1; using 192.168.0.76 instead (on interface ens3)
18/11/01 18:40:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Running on Apache Spark version 2.2.0
SparkUI available at http://192.168.0.76:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version 0.2-29fbaeaf265e
LOGGING: writing to /mnt/output/sb/regression1/hail-20181101-1840-0.2-29fbaeaf265e.log
2018-11-01 18:40:26 Hail: INFO: Reading table with no type imputation
Loading column ‘f0’ as type ‘str’ (type not specified)

[Stage 1:====================================================>(3089 + 1) / 3090]2018-11-01 19:18:20 Hail: INFO: Coerced almost-sorted dataset
2018-11-01 19:18:20 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 2:====================================================>(3089 + 1) / 3090]2018-11-02 10:19:15 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 3:=============================> (1 + 1) / 2]2018-11-02 10:19:38 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 6:> (0 + 50) / 2901]Traceback (most recent call last):
File “/mnt/output/sb/regression1/UK10K.py”, line 28, in
ds = hl.variant_qc(ds).cache()
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2806, in cache
File “”, line 2, in persist
File “/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py”, line 560, in wrapper
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2845, in persist
File “/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in call
File “/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py”, line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 6.0 failed 1 times, most recent failure: Lost task 33.0 in stage 6.0 (TID 9329, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:102)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:139)
at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1063)
at is.hail.rvd.RVD$.makeCoercer(RVD.scala:1127)
at is.hail.rvd.RVD$.coerce(RVD.scala:1085)
at is.hail.rvd.RVD.changeKey(RVD.scala:120)
at is.hail.rvd.RVD.changeKey(RVD.scala:117)
at is.hail.rvd.RVD.enforceKey(RVD.scala:112)
at is.hail.expr.ir.MatrixKeyRowsBy.execute(MatrixIR.scala:1321)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixFilterRows.execute(MatrixIR.scala:511)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.variant.MatrixTable.value$lzycompute(MatrixTable.scala:417)
at is.hail.variant.MatrixTable.value(MatrixTable.scala:415)
at is.hail.variant.MatrixTable.x$9$lzycompute(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.x$9(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.rvd$lzycompute(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.rvd(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.persist(MatrixTable.scala:1211)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:102)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Hail version: 0.2-29fbaeaf265e
Error summary: OutOfMemoryError: Java heap space

Hi Stephane,
can you share your pipeline? I’m worried about this:

[Stage 2:====================================================>(3089 + 1) / 3090]
2018-11-02 10:19:15 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 3:=============================> (1 + 1) / 2]
2018-11-02 10:19:38 Hail: INFO: Ordering unsorted dataset with network shuffle

Especially the first one. We’ve had a ton of trouble with Spark OOMing while shuffling large datasets.

Hi Tim @tpoterba

sure, here is the whole thing (note that now I removed .cache() for variant qc, it appears to have crashed at the next step, saving the matrix).

The submission command:

nohup spark-submit --jars &quot;$HAIL_HOME/build/libs/hail-all-spark.jar&quot; --py-files &quot;$HAIL_HOME/build/distributions/hail-python.zip&quot; --conf spark.driver.extraClassPath=&quot;$HAIL_HOME/build/libs/hail-all-spark.jar&quot; --conf spark.executor.extraClassPath=./hail-all-spark.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryo.registrator=is.hail.kryo.HailKryoRegistrator --executor-memory 8g --driver-memory 8g UK10K.py &amp;

The script itself:

import hail as hl
import hail.expr.aggregators as agg
hl.init()
from pprint import pprint
from bokeh.io import output_notebook, show, export_png
from bokeh.layouts import gridplot
from bokeh.models import Span
import os

os.chdir("/mnt/output/sb/VTE/MALMO/regressions/regression1")

# Load vcf, use GP field
ds = hl.import_vcf('/mnt/output/sb/UK10K1000GP3.vcfs/*.vcf.bgz', skip_invalid_loci=True)

# Exclude HRC variants
variant_exclusion_table = hl.import_table("/mnt/output/sb/HRC.vcfs/HRC_variants.tsv.bgz", no_header=True, key='f0')
variant_exclusion_table = variant_exclusion_table.key_by('f0')
ds = ds.key_rows_by('rsid')
ds = ds.filter_rows(~hl.is_defined(variant_exclusion_table[ds.rsid]))

# filter out INFO below 0.4
ds = ds.filter_rows(ds.info.INFO>=0.4)

# filter out minor allele count below 6
ds = hl.variant_qc(ds)
ds = ds.filter_rows((ds.variant_qc.AC[0] > 6.0) & (ds.variant_qc.AC[1] > 6.0))

# Annotate with phenotypes
table = (hl.import_table('/mnt/output/sb/PHENOTYPES.final.hail.txt', impute=True).key_by('sampleID'))

def sex2bool(sex):
return hl.cond(sex == 2, True, False)

def case(pheno):
return hl.cond(pheno == 1, True, False)

table = table.annotate(is_female = sex2bool(table.sex))
table = table.annotate(pheno1_case = case(table.pheno1))
table = table.annotate(pheno2_case = case(table.pheno2))
table = table.annotate(pheno3_case = case(table.pheno3))
ds = ds.annotate_cols(**table[ds.s])

# Save matrix for future analyses
ds.write('/mnt/output/sb/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt')

# Perform GWAS
gwas = hl.logistic_regression_rows(test='wald',pass_through=[ds.rsid],y=ds.pheno1_case,x=hl.gp_dosage(ds.GP),covariates=[1, ds.is_female, ds.age, ds.weight, ds.PC1, ds.PC2, ds.PC3,ds.PC4,ds.PC5])

# save results to file
gwas = gwas.filter(gwas.fit.converged == True)
gwas = gwas.flatten()
gwas.flatten().export('file:///mnt/output/sb/regression1/UK10K1000GP3_pheno1_regression1.tsv.bgz')

Now the latest log:

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/04 15:08:43 WARN Utils: Your hostname, d-hail resolves to a loopback address: 127.0.0.1; using 192.168.0.76 instead (on interface ens3)
18/11/04 15:08:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Running on Apache Spark version 2.2.0
SparkUI available at http://192.168.0.76:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version 0.2-29fbaeaf265e
LOGGING: writing to /mnt/output/sb/regression1/hail-20181104-1508-0.2-29fbaeaf265e.log
[Stage 0:> (0 + 22) / 22]2018-11-04 15:08:57 Hail: INFO: Reading table with no type imputation
Loading column ‘f0’ as type ‘str’ (type not specified)

2018-11-04 15:08:58 Hail: INFO: Reading table to impute column types
2018-11-04 15:08:59 Hail: INFO: Finished type imputation
Loading column ‘sampleID’ as type ‘str’ (imputed)
Loading column ‘sex’ as type ‘int32’ (imputed)
Loading column ‘PC1’ as type ‘float64’ (imputed)
Loading column ‘PC2’ as type ‘float64’ (imputed)
Loading column ‘PC3’ as type ‘float64’ (imputed)
Loading column ‘PC4’ as type ‘float64’ (imputed)
Loading column ‘PC5’ as type ‘float64’ (imputed)
Loading column ‘age’ as type ‘float64’ (imputed)
Loading column ‘weight’ as type ‘float64’ (imputed)
Loading column ‘height’ as type ‘float64’ (imputed)
Loading column ‘pheno1’ as type ‘int32’ (imputed)
Loading column ‘pheno2’ as type ‘int32’ (imputed)
Loading column ‘pheno3’ as type ‘int32’ (imputed)
Loading column ‘p2’ as type ‘int32’ (imputed)
Loading column ‘p3’ as type ‘int32’ (imputed)
[Stage 2:====================================================>(3089 + 1) / 3090]2018-11-04 15:55:05 Hail: INFO: Coerced almost-sorted dataset
2018-11-04 15:55:05 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 3:====================================================>(3089 + 1) / 3090]2018-11-05 08:51:22 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 4:=============================> (1 + 1) / 2]2018-11-05 08:51:50 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 7:> (0 + 46) / 2901]Traceback (most recent call last):
File “/mnt/output/sb/regression1/UK10K.py”, line 48, in
ds.write(‘/mnt/output/sb/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt’)
File “”, line 2, in write
File “/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py”, line 560, in wrapper
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2148, in write
File “/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in call
File “/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py”, line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 7.0 failed 1 times, most recent failure: Lost task 36.0 in stage 7.0 (TID 9333, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:139)
at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1063)
at is.hail.rvd.RVD$.makeCoercer(RVD.scala:1127)
at is.hail.rvd.RVD$.coerce(RVD.scala:1085)
at is.hail.rvd.RVD.changeKey(RVD.scala:120)
at is.hail.rvd.RVD.changeKey(RVD.scala:117)
at is.hail.rvd.RVD.enforceKey(RVD.scala:112)
at is.hail.expr.ir.MatrixKeyRowsBy.execute(MatrixIR.scala:1321)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixFilterRows.execute(MatrixIR.scala:511)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixFilterRows.execute(MatrixIR.scala:511)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixAnnotateColsTable.execute(MatrixIR.scala:1938)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapCols.execute(MatrixIR.scala:1558)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapCols.execute(MatrixIR.scala:1558)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:647)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:57)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:32)
at is.hail.variant.MatrixTable.write(MatrixTable.scala:1237)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError: Java heap space
at

Hail version: 0.2-29fbaeaf265e
Error summary: OutOfMemoryError: Java heap space

This:

ds = ds.key_rows_by('rsid')

Will trigger a full shuffle. It’s also probably unnecessary – do you want to join with the MT by rsid a lot downstream?

You can run this line:

ds = ds.filter_rows(~hl.is_defined(variant_exclusion_table[ds.rsid]))

without keying by rsid first – and it’ll be a lot faster, since a much smaller fraction of the data needs to be shuffled.

I removed ds = ds.key_rows_by('rsid') but it’s still crashing (and shuffling) when trying to save the matrix file:

[Stage 2:====================================================>(3089 + 1) / 3090]2018-11-18 11:57:35 Hail: INFO: Coerced almost-sorted dataset
2018-11-18 11:57:36 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 3:=============================> (1 + 1) / 2]2018-11-18 11:57:55 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-11-18 11:57:55 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 4:====================================================>(3089 + 1) / 3090]2018-11-18 13:47:33 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 5:====================================================>(3089 + 1) / 3090]2018-11-18 15:34:20 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-11-18 15:34:22 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 11:> (0 + 46) / 3090]Traceback (most recent call last):
File “/mnt/output/regression1/UK10K.py”, line 50, in
ds.write(‘/mnt/output/sb/VTE/MALMO/imputed_genotypes/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt’)
File “”, line 2, in write
File “/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py”, line 560, in wrapper
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2148, in write
File “/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in call
File “/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py”, line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space

Hi @tpoterba and @danking,

hope you had a nice Christmas.

Sorry to bother you again, but I’m still getting hit by memory errors because of some shuffling, and I don’t know what else I can do. Any idea?

[Stage 11:>                                                     (0 + 46) / 3090]Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.spark.ui.ConsoleProgressBar$$anonfun$6.apply(ConsoleProgressBar.scala:85)
        at org.apache.spark.ui.ConsoleProgressBar$$anonfun$6.apply(ConsoleProgressBar.scala:82)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.ui.ConsoleProgressBar.show(ConsoleProgressBar.scala:82)
        at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:71)
        at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:55)
        at java.util.TimerThread.mainLoop(Timer.java:555)
        at java.util.TimerThread.run(Timer.java:505)
Traceback (most recent call last):
  File "/mnt/output/sb/VTE/MALMO/regressions/regression1/UK10K.py", line 39, in <module>
    ds.write('/mnt/output/sb/VTE/MALMO/imputed_genotypes/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt')
  File "<decorator-gen-782>", line 2, in write
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py", line 560, in wrapper
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py", line 2163, in write
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/backend/backend.py", line 25, in interpret
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py", line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: GC overhead limit exceeded


Hail version: 0.2.5-b9537d16564d
Error summary: OutOfMemoryError: GC overhead limit exceeded

Hmmm, this looks like the driver is running out of memory. That’s odd…

Can we see the full pipeline again? There were a couple tweaks in the last few comments.

Just emailed the full pipeline to you, let me know if you haven’t received it.
I applied, at least I think, all we had discussed.

Hi,

so, it appears the latest memory issues I’ve been experiencing may be linked to the exclusion of rsids from a large list (there are over 13M rsids in this list), which I’ve been performing as follow:

variant_exclusion_table = hl.import_table("HRC_variants.tsv.bgz", no_header=True, key='f0')
variant_exclusion_table = variant_exclusion_table.key_by('f0')
ds = ds.filter_rows(~hl.is_defined(variant_exclusion_table[ds.rsid]))

Any idea?

We’ve been discussing shuffle memory settings and think it might be possible to configure spark to error a bit less. Will update you on that in the next few days as we think about that.

For now, I’d try to do the following:

variant_exclusion_table = hl.import_table("HRC_variants.tsv.bgz", no_header=True, key='f0')
variant_exclusion_table = variant_exclusion_table.key_by('f0')
ds = hl.import_bgen(...) or read_matrix_table(...)
rows = ds.rows().select('rsid')
rows = rows.annotate(included = ~hl.is_defined(variant_exclusion_table[rows.rsid]))
rows.write(...)

Then the next time, you can read in the result and join on key, which won’t shuffle.

If this doesn’t work, let me know here.

Hi Tim,

thanks a lot.
I’m a bit hazy on the procedure though… rows is a table with only rsids, then a boolean column named “included” is created, dependant on their presence, or not, in the exclusion table, have I got this right?

Then, I should annotate ds using the rows table, and then save ds with only the rows where included is false?