Export_vcf OutOfMemoryError: Java heap space despite --driver-memory 8g


#8

There’s really only three steps, read, shuffle, write, so there’s no other spot to do the sorting.

Can you share the hail.log file from one of these runs? It’s usually written to the working directory of the executor process. Can you also share the size of the plink file as displayed by du -sh /path/to/the/file?


#9

I’ve just sent you a link to download the log file.
The bed file I’m trying to convert is 478M


#10

Sorry to re-open this, but I’m still facing some memory issues during QC and GWAS, so I’m wondering if I’m submitting the spark job the wrong way, or if I need to build a VM with more memory per core (I’ve got around 9GB/core).

My submit command:

nohup spark-submit --jars "$HAIL_HOME/build/libs/hail-all-spark.jar" --py-files "$HAIL_HOME/build/distributions/hail-python.zip" --conf spark.driver.extraClassPath="$HAIL_HOME/build/libs/hail-all-spark.jar" --conf spark.executor.extraClassPath=./hail-all-spark.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryo.registrator=is.hail.kryo.HailKryoRegistrator --executor-memory 8g --driver-memory 8g UK10K.py &

error message:

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/01 18:40:20 WARN Utils: Your hostname, deloukas-hail resolves to a loopback address: 127.0.0.1; using 192.168.0.76 instead (on interface ens3)
18/11/01 18:40:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Running on Apache Spark version 2.2.0
SparkUI available at http://192.168.0.76:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version 0.2-29fbaeaf265e
LOGGING: writing to /mnt/output/sb/regression1/hail-20181101-1840-0.2-29fbaeaf265e.log
2018-11-01 18:40:26 Hail: INFO: Reading table with no type imputation
Loading column ‘f0’ as type ‘str’ (type not specified)

[Stage 1:====================================================>(3089 + 1) / 3090]2018-11-01 19:18:20 Hail: INFO: Coerced almost-sorted dataset
2018-11-01 19:18:20 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 2:====================================================>(3089 + 1) / 3090]2018-11-02 10:19:15 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 3:=============================> (1 + 1) / 2]2018-11-02 10:19:38 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 6:> (0 + 50) / 2901]Traceback (most recent call last):
File “/mnt/output/sb/regression1/UK10K.py”, line 28, in
ds = hl.variant_qc(ds).cache()
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2806, in cache
File “”, line 2, in persist
File “/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py”, line 560, in wrapper
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2845, in persist
File “/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in call
File “/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py”, line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 33 in stage 6.0 failed 1 times, most recent failure: Lost task 33.0 in stage 6.0 (TID 9329, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:102)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:139)
at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1063)
at is.hail.rvd.RVD$.makeCoercer(RVD.scala:1127)
at is.hail.rvd.RVD$.coerce(RVD.scala:1085)
at is.hail.rvd.RVD.changeKey(RVD.scala:120)
at is.hail.rvd.RVD.changeKey(RVD.scala:117)
at is.hail.rvd.RVD.enforceKey(RVD.scala:112)
at is.hail.expr.ir.MatrixKeyRowsBy.execute(MatrixIR.scala:1321)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixFilterRows.execute(MatrixIR.scala:511)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.variant.MatrixTable.value$lzycompute(MatrixTable.scala:417)
at is.hail.variant.MatrixTable.value(MatrixTable.scala:415)
at is.hail.variant.MatrixTable.x$9$lzycompute(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.x$9(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.rvd$lzycompute(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.rvd(MatrixTable.scala:422)
at is.hail.variant.MatrixTable.persist(MatrixTable.scala:1211)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anonfun$5.apply(ShuffleBlockFetcherIterator.scala:390)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply$mcJ$sp(Utils.scala:342)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$$anonfun$copyStream$1.apply(Utils.scala:327)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.util.Utils$.copyStream(Utils.scala:348)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:395)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:59)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:199)
at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:102)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)

Hail version: 0.2-29fbaeaf265e
Error summary: OutOfMemoryError: Java heap space


#11

Hi Stephane,
can you share your pipeline? I’m worried about this:

[Stage 2:====================================================>(3089 + 1) / 3090]
2018-11-02 10:19:15 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 3:=============================> (1 + 1) / 2]
2018-11-02 10:19:38 Hail: INFO: Ordering unsorted dataset with network shuffle

Especially the first one. We’ve had a ton of trouble with Spark OOMing while shuffling large datasets.


#12

Hi Tim @tpoterba

sure, here is the whole thing (note that now I removed .cache() for variant qc, it appears to have crashed at the next step, saving the matrix).

The submission command:

nohup spark-submit --jars &quot;$HAIL_HOME/build/libs/hail-all-spark.jar&quot; --py-files &quot;$HAIL_HOME/build/distributions/hail-python.zip&quot; --conf spark.driver.extraClassPath=&quot;$HAIL_HOME/build/libs/hail-all-spark.jar&quot; --conf spark.executor.extraClassPath=./hail-all-spark.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.kryo.registrator=is.hail.kryo.HailKryoRegistrator --executor-memory 8g --driver-memory 8g UK10K.py &amp;

The script itself:

import hail as hl
import hail.expr.aggregators as agg
hl.init()
from pprint import pprint
from bokeh.io import output_notebook, show, export_png
from bokeh.layouts import gridplot
from bokeh.models import Span
import os

os.chdir("/mnt/output/sb/VTE/MALMO/regressions/regression1")

# Load vcf, use GP field
ds = hl.import_vcf('/mnt/output/sb/UK10K1000GP3.vcfs/*.vcf.bgz', skip_invalid_loci=True)

# Exclude HRC variants
variant_exclusion_table = hl.import_table("/mnt/output/sb/HRC.vcfs/HRC_variants.tsv.bgz", no_header=True, key='f0')
variant_exclusion_table = variant_exclusion_table.key_by('f0')
ds = ds.key_rows_by('rsid')
ds = ds.filter_rows(~hl.is_defined(variant_exclusion_table[ds.rsid]))

# filter out INFO below 0.4
ds = ds.filter_rows(ds.info.INFO>=0.4)

# filter out minor allele count below 6
ds = hl.variant_qc(ds)
ds = ds.filter_rows((ds.variant_qc.AC[0] > 6.0) & (ds.variant_qc.AC[1] > 6.0))

# Annotate with phenotypes
table = (hl.import_table('/mnt/output/sb/PHENOTYPES.final.hail.txt', impute=True).key_by('sampleID'))

def sex2bool(sex):
return hl.cond(sex == 2, True, False)

def case(pheno):
return hl.cond(pheno == 1, True, False)

table = table.annotate(is_female = sex2bool(table.sex))
table = table.annotate(pheno1_case = case(table.pheno1))
table = table.annotate(pheno2_case = case(table.pheno2))
table = table.annotate(pheno3_case = case(table.pheno3))
ds = ds.annotate_cols(**table[ds.s])

# Save matrix for future analyses
ds.write('/mnt/output/sb/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt')

# Perform GWAS
gwas = hl.logistic_regression_rows(test='wald',pass_through=[ds.rsid],y=ds.pheno1_case,x=hl.gp_dosage(ds.GP),covariates=[1, ds.is_female, ds.age, ds.weight, ds.PC1, ds.PC2, ds.PC3,ds.PC4,ds.PC5])

# save results to file
gwas = gwas.filter(gwas.fit.converged == True)
gwas = gwas.flatten()
gwas.flatten().export('file:///mnt/output/sb/regression1/UK10K1000GP3_pheno1_regression1.tsv.bgz')

Now the latest log:

Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
18/11/04 15:08:43 WARN Utils: Your hostname, d-hail resolves to a loopback address: 127.0.0.1; using 192.168.0.76 instead (on interface ens3)
18/11/04 15:08:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Running on Apache Spark version 2.2.0
SparkUI available at http://192.168.0.76:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
/
/ //_,/// version 0.2-29fbaeaf265e
LOGGING: writing to /mnt/output/sb/regression1/hail-20181104-1508-0.2-29fbaeaf265e.log
[Stage 0:> (0 + 22) / 22]2018-11-04 15:08:57 Hail: INFO: Reading table with no type imputation
Loading column ‘f0’ as type ‘str’ (type not specified)

2018-11-04 15:08:58 Hail: INFO: Reading table to impute column types
2018-11-04 15:08:59 Hail: INFO: Finished type imputation
Loading column ‘sampleID’ as type ‘str’ (imputed)
Loading column ‘sex’ as type ‘int32’ (imputed)
Loading column ‘PC1’ as type ‘float64’ (imputed)
Loading column ‘PC2’ as type ‘float64’ (imputed)
Loading column ‘PC3’ as type ‘float64’ (imputed)
Loading column ‘PC4’ as type ‘float64’ (imputed)
Loading column ‘PC5’ as type ‘float64’ (imputed)
Loading column ‘age’ as type ‘float64’ (imputed)
Loading column ‘weight’ as type ‘float64’ (imputed)
Loading column ‘height’ as type ‘float64’ (imputed)
Loading column ‘pheno1’ as type ‘int32’ (imputed)
Loading column ‘pheno2’ as type ‘int32’ (imputed)
Loading column ‘pheno3’ as type ‘int32’ (imputed)
Loading column ‘p2’ as type ‘int32’ (imputed)
Loading column ‘p3’ as type ‘int32’ (imputed)
[Stage 2:====================================================>(3089 + 1) / 3090]2018-11-04 15:55:05 Hail: INFO: Coerced almost-sorted dataset
2018-11-04 15:55:05 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 3:====================================================>(3089 + 1) / 3090]2018-11-05 08:51:22 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 4:=============================> (1 + 1) / 2]2018-11-05 08:51:50 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 7:> (0 + 46) / 2901]Traceback (most recent call last):
File “/mnt/output/sb/regression1/UK10K.py”, line 48, in
ds.write(’/mnt/output/sb/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt’)
File “”, line 2, in write
File “/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py”, line 560, in wrapper
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2148, in write
File “/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in call
File “/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py”, line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 36 in stage 7.0 failed 1 times, most recent failure: Lost task 36.0 in stage 7.0 (TID 9333, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:139)
at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1063)
at is.hail.rvd.RVD$.makeCoercer(RVD.scala:1127)
at is.hail.rvd.RVD$.coerce(RVD.scala:1085)
at is.hail.rvd.RVD.changeKey(RVD.scala:120)
at is.hail.rvd.RVD.changeKey(RVD.scala:117)
at is.hail.rvd.RVD.enforceKey(RVD.scala:112)
at is.hail.expr.ir.MatrixKeyRowsBy.execute(MatrixIR.scala:1321)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixFilterRows.execute(MatrixIR.scala:511)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapRows.execute(MatrixIR.scala:1352)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixFilterRows.execute(MatrixIR.scala:511)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixAnnotateColsTable.execute(MatrixIR.scala:1938)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapCols.execute(MatrixIR.scala:1558)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.MatrixMapCols.execute(MatrixIR.scala:1558)
at is.hail.expr.ir.MatrixMapGlobals.execute(MatrixIR.scala:1832)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:647)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:57)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:32)
at is.hail.variant.MatrixTable.write(MatrixTable.scala:1237)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError: Java heap space
at

Hail version: 0.2-29fbaeaf265e
Error summary: OutOfMemoryError: Java heap space


#13

This:

ds = ds.key_rows_by('rsid')

Will trigger a full shuffle. It’s also probably unnecessary – do you want to join with the MT by rsid a lot downstream?

You can run this line:

ds = ds.filter_rows(~hl.is_defined(variant_exclusion_table[ds.rsid]))

without keying by rsid first – and it’ll be a lot faster, since a much smaller fraction of the data needs to be shuffled.


#14

I removed ds = ds.key_rows_by('rsid') but it’s still crashing (and shuffling) when trying to save the matrix file:

[Stage 2:====================================================>(3089 + 1) / 3090]2018-11-18 11:57:35 Hail: INFO: Coerced almost-sorted dataset
2018-11-18 11:57:36 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 3:=============================> (1 + 1) / 2]2018-11-18 11:57:55 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-11-18 11:57:55 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 4:====================================================>(3089 + 1) / 3090]2018-11-18 13:47:33 Hail: INFO: Coerced dataset with out-of-order partitions.
[Stage 5:====================================================>(3089 + 1) / 3090]2018-11-18 15:34:20 Hail: INFO: Ordering unsorted dataset with network shuffle
2018-11-18 15:34:22 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 11:> (0 + 46) / 3090]Traceback (most recent call last):
File “/mnt/output/regression1/UK10K.py”, line 50, in
ds.write(’/mnt/output/sb/VTE/MALMO/imputed_genotypes/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt’)
File “”, line 2, in write
File “/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py”, line 560, in wrapper
File “/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py”, line 2148, in write
File “/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py”, line 1133, in call
File “/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py”, line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space


#15

Hi @tpoterba and @danking,

hope you had a nice Christmas.

Sorry to bother you again, but I’m still getting hit by memory errors because of some shuffling, and I don’t know what else I can do. Any idea?

[Stage 11:>                                                     (0 + 46) / 3090]Exception in thread "refresh progress" java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.spark.ui.ConsoleProgressBar$$anonfun$6.apply(ConsoleProgressBar.scala:85)
        at org.apache.spark.ui.ConsoleProgressBar$$anonfun$6.apply(ConsoleProgressBar.scala:82)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.ui.ConsoleProgressBar.show(ConsoleProgressBar.scala:82)
        at org.apache.spark.ui.ConsoleProgressBar.org$apache$spark$ui$ConsoleProgressBar$$refresh(ConsoleProgressBar.scala:71)
        at org.apache.spark.ui.ConsoleProgressBar$$anon$1.run(ConsoleProgressBar.scala:55)
        at java.util.TimerThread.mainLoop(Timer.java:555)
        at java.util.TimerThread.run(Timer.java:505)
Traceback (most recent call last):
  File "/mnt/output/sb/VTE/MALMO/regressions/regression1/UK10K.py", line 39, in <module>
    ds.write('/mnt/output/sb/VTE/MALMO/imputed_genotypes/UK10K1000GP3.vcfs/UK10K1000GP3_QCed_annotated.mt')
  File "<decorator-gen-782>", line 2, in write
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py", line 560, in wrapper
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/matrixtable.py", line 2163, in write
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/backend/backend.py", line 25, in interpret
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py", line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: GC overhead limit exceeded


Hail version: 0.2.5-b9537d16564d
Error summary: OutOfMemoryError: GC overhead limit exceeded


#16

Hmmm, this looks like the driver is running out of memory. That’s odd…

Can we see the full pipeline again? There were a couple tweaks in the last few comments.


#17

Just emailed the full pipeline to you, let me know if you haven’t received it.
I applied, at least I think, all we had discussed.


#18

Hi,

so, it appears the latest memory issues I’ve been experiencing may be linked to the exclusion of rsids from a large list (there are over 13M rsids in this list), which I’ve been performing as follow:

variant_exclusion_table = hl.import_table("HRC_variants.tsv.bgz", no_header=True, key='f0')
variant_exclusion_table = variant_exclusion_table.key_by('f0')
ds = ds.filter_rows(~hl.is_defined(variant_exclusion_table[ds.rsid]))

Any idea?


#19

We’ve been discussing shuffle memory settings and think it might be possible to configure spark to error a bit less. Will update you on that in the next few days as we think about that.

For now, I’d try to do the following:

variant_exclusion_table = hl.import_table("HRC_variants.tsv.bgz", no_header=True, key='f0')
variant_exclusion_table = variant_exclusion_table.key_by('f0')
ds = hl.import_bgen(...) or read_matrix_table(...)
rows = ds.rows().select('rsid')
rows = rows.annotate(included = ~hl.is_defined(variant_exclusion_table[rows.rsid]))
rows.write(...)

Then the next time, you can read in the result and join on key, which won’t shuffle.

If this doesn’t work, let me know here.


#20

Hi Tim,

thanks a lot.
I’m a bit hazy on the procedure though… rows is a table with only rsids, then a boolean column named “included” is created, dependant on their presence, or not, in the exclusion table, have I got this right?

Then, I should annotate ds using the rows table, and then save ds with only the rows where included is false?


#21

Table.select (And MT.select_rows / select_cols) keep key fields implicitly - so rows = ds.rows().select('rsid') preserves the locus/alleles key.

What I want to do here is do the expensive and temperamental shuffle-join once with a small dataset, save it, and use that with the bgen.


#22

OK, I see, but how do I then use it with the bgen?


#23

then you can read it in and join on key:

mt = hl.import_bgen(..._
inclusion_ht = hl.read_table('inclusion.ht')
mt = mt.filter_rows(~inclusion_ht[mt.row_key].inclusion)

#24

then you can read it in and join on key:

mt = hl.import_bgen(..._
inclusion_ht = hl.read_table('inclusion.ht')
mt = mt.filter_rows(~inclusion_ht[mt.row_key].inclusion)

#25

then you can read it in and join on key:

mt = hl.import_bgen(..._
inclusion_ht = hl.read_table('inclusion.ht')
mt = mt.filter_rows(~inclusion_ht[mt.row_key].inclusion)

#26

OK… I wasn’t sure if using filter_rows was still the way to go. Thank you, I’ll try that and let you know how it goes.


#27

Unfortunately, it seems the shuffling issue persists even on the smaller matrix, it crashes when it’s executed (i.e. when saving the matrix):

variant_exclusion_table = hl.import_table("/mnt/output/HRC.vcfs/HRC_variants.tsv.bgz", no_header=True, key=‘f0’)
variant_exclusion_table = variant_exclusion_table.key_by(‘f0’)
rows = ds.rows().select(‘rsid’)
rows = rows.annotate(included = ~hl.is_defined(variant_exclusion_table[rows.rsid]))
rows.write(“mnt/output/HRC.vcfs/UK10K_inclusion_status.ht”)

[Stage 6:>                                                      (0 + 46) / 2901]Traceback (most recent call last):
  File "/mnt/output/regression1/UK10K.py", line 17, in <module>
    rows.write("/mnt/output/HRC.vcfs/UK10K_inclusion_status.ht")
  File "<decorator-gen-694>", line 2, in write
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/typecheck/check.py", line 560, in wrapper
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/table.py", line 1163, in write
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/backend/backend.py", line 25, in interpret
  File "/usr/local/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/local/hail/build/distributions/hail-python.zip/hail/utils/java.py", line 210, in deco
hail.utils.java.FatalError: OutOfMemoryError: Java heap space