Ld_prune OutOfMemoryError: Java heap space

Hi,

I am currently evaluating Hail as a new tool for data processing / pre gwas QC for our group at UoC.

We currently have a set of 41,388 samples, typed on the UKBB array, that we have computed relatenes for using our standard R pipeline.

We are inputting a set of 44,018 variants which we currently use as the “relatedness/ethnicity” set for this particular array.

We want to implement a hail pipeline and compare results on this set, but we are running into memory issues with the ld_prune() and pc_relate() methods.

Installed hail using the pip methodology and we initialising pyspark with the below parameters:

PYSPARK_SUBMIT_ARGS="–driver-memory 400g --executor-memory 400g pyspark-shell"

Here is a test workflow + output:

>>> hl.init()
20/01/16 10:20:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Running on Apache Spark version 2.4.1
SparkUI available at http://login-l-1.data.cluster:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.30-2ae07d872f43
LOGGING: writing to /rds/project/who1000-1/rds-who1000-cbrc/Affy/projects/AIR/20190416.GWAS_analysis/re_genotyping/analysis_steps.v2/12.pca_on_everyone/salih_variants/hail-20200116-1020-0.2.30-2ae07d872f43.log
>>> mt = hl.read_matrix_table('AIR.hwe_filtered.salih_variants.mt')
>>> mt.count()
(41388, 44018)
>>> mt.describe()
----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
----------------------------------------
Row fields:
    'locus': locus<GRCh37>
    'alleles': array<str>
    'rsid': str
    'qual': float64
    'filters': set<str>
    'info': struct {
        UNKNOWNPOSITION: bool,
        probeset_id: str,
        AC: array<int32>,
        AN: int32,
        GN: array<int32>,
        HWE: float64,
        F: float64
    }
----------------------------------------
Entry fields:
    'GT': call
----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
----------------------------------------
>>> mt = mt.filter_rows(hl.len(mt.alleles) == 2)
>>> mt.count()
(41388, 44018)
>>> pruned_variant_table = hl.ld_prune(mt.GT, r2=0.2, bp_window_size=500000)
2020-01-16 10:25:35 Hail: INFO: ld_prune: running local pruning stage with max queue size of 6071 variants
[Stage 2:=====================================================> (213 + 5) / 218]2020-01-16 10:25:51 Hail: INFO: wrote table with 39887 rows in 218 partitions to file:/tmp/hail.JAVHO3q8jSQ0/3CawbE13ry
[Stage 6:====================================================>     (9 + 1) / 10]2020-01-16 10:26:33 Hail: INFO: Wrote all 110 blocks of 39887 x 44018 matrix with block size 4096.
[Stage 10:>                                                        (0 + 2) / 19]Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "</home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/decorator.py:decorator-gen-1414>", line 2, in ld_prune
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/typecheck/check.py", line 585, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/methods/statgen.py", line 3430, in ld_prune
    entries.i, entries.j, keep=False, tie_breaker=tie_breaker, keyed=False)
  File "</home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/decorator.py:decorator-gen-1206>", line 2, in maximal_independent_set
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/typecheck/check.py", line 585, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/methods/misc.py", line 149, in maximal_independent_set
    edges.write(edges_path)
  File "</home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/decorator.py:decorator-gen-926>", line 2, in write
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/typecheck/check.py", line 585, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/table.py", line 1218, in write
    Env.backend().execute(TableWrite(self._tir, TableNativeWriter(output, overwrite, stage_locally, _codec_spec)))
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/backend/backend.py", line 109, in execute
    result = json.loads(Env.hc()._jhc.backend().executeJSON(self._to_java_ir(ir)))
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/ng384/software/lib/miniconda3/envs/hail/lib/python3.7/site-packages/hail/utils/java.py", line 225, in deco
    'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
hail.utils.java.FatalError: OutOfMemoryError: Java heap space

Java stack trace:
java.lang.RuntimeException: error while applying lowering 'InterpretNonCompilable'
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:26)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:18)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:18)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$apply$1.apply(CompileAndEvaluate.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.CompileAndEvaluate$.apply(CompileAndEvaluate.scala:14)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:10)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:9)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:9)
	at is.hail.backend.Backend.execute(Backend.scala:56)
	at is.hail.backend.Backend.executeJSON(Backend.scala:62)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 10.0 failed 1 times, most recent failure: Lost task 15.0 in stage 10.0 (TID 1553, localhost, executor driver): java.lang.OutOfMemoryError: Java heap space
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
	at breeze.linalg.DenseMatrix$.zeros$mDc$sp(DenseMatrix.scala:345)
	at is.hail.linalg.BlockMatrixMultiplyRDD.compute(BlockMatrix.scala:1666)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at is.hail.utils.richUtils.RichRDD$$anon$1.compute(RichRDD.scala:118)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:223)
	at is.hail.utils.richUtils.RichContextRDD.writePartitions(RichContextRDD.scala:77)
	at is.hail.io.RichContextRDDRegionValue$.writeRows$extension(RichContextRDDRegionValue.scala:205)
	at is.hail.rvd.RVD.write(RVD.scala:711)
	at is.hail.expr.ir.TableValue.write(TableValue.scala:89)
	at is.hail.expr.ir.TableNativeWriter.apply(TableWriter.scala:25)
	at is.hail.expr.ir.Interpret$.run(Interpret.scala:562)
	at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:54)
	at is.hail.expr.ir.InterpretNonCompilable$.interpretAndCoerce$1(InterpretNonCompilable.scala:16)
	at is.hail.expr.ir.InterpretNonCompilable$.is$hail$expr$ir$InterpretNonCompilable$$rewrite$1(InterpretNonCompilable.scala:53)
	at is.hail.expr.ir.InterpretNonCompilable$.apply(InterpretNonCompilable.scala:58)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.transform(LoweringPass.scala:48)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:13)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:11)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:11)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.apply(LoweringPass.scala:43)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:20)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:18)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:18)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$apply$1.apply(CompileAndEvaluate.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.CompileAndEvaluate$.apply(CompileAndEvaluate.scala:14)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:10)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:9)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:9)
	at is.hail.backend.Backend.execute(Backend.scala:56)
	at is.hail.backend.Backend.executeJSON(Backend.scala:62)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

java.lang.OutOfMemoryError: Java heap space
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
	at breeze.linalg.DenseMatrix$.zeros$mDc$sp(DenseMatrix.scala:345)
	at is.hail.linalg.BlockMatrixMultiplyRDD.compute(BlockMatrix.scala:1666)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at is.hail.utils.richUtils.RichRDD$$anon$1.compute(RichRDD.scala:118)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)





Hail version: 0.2.30-2ae07d872f43
Error summary: OutOfMemoryError: Java heap space

Can you paste the sequence of commands you used to start python? If your executors have 400 gigabytes of memory there is no way that line could fail due to memory, it’s allocates on the order of tens of megabytes of memory.

Something to know – ld_prune is one of the most painful and slowest commands in Hail right now.

Thank you for the quick replies.

Looking deeper into this, the PYSPARK_SUBMIT_ARGS variable was being overwritten by another projects config files upon loading spark. We have since dialled back the executor memory as we no longer receive the error.

Hail is performing excellently and we much appreciate the simplified workflows that can be implemented.

Do you have any advice on how optimise the amount of memory given to drivers and executors?

1 Like

We mainly test against Dataproc clusters with n1-standard-8’s which allocate 3.75 GB per core. I’m not aware of any more principled way of setting the memory of an executor. We usually run drivers as n1-standard-16’s which have 60 GB of memory. Driver memory facilitates large aggregations.

Here you’re running local mode so you something on the order of --driver-memory XG where X is 4 * number of cores should work fine