Spark memory error trying to write matrixtable

Hi all—

I am continuing on my quest to run a pheWAS on DNAnexus, and I think I’m getting pretty close. The problem now is a SparkException that I get when I try to save a (massive) matrixtable with mt.checkpoint(). Error below (all errors trimmed to meet post length restriction).

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-8-a5b4aad6cea0> in <module>
----> 1 mt.checkpoint(f'dnax://{my_database}/CASR/phewas/jeremy_mt_1_phenotypes.mt/' class="ansi-blue-fg">, overwrite=True)

<decorator-gen-1273> in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)
...
     29                 raise FatalError('%s\n\nJava stack trace:\n%s\n'
     30                                  'Hail version: %s\n'
---> 31                                  'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
     32         except pyspark.sql.utils.CapturedException as e:
     33             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Total size of serialized results of 31 tasks (1033.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 31 tasks (1033.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2001)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1984)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1983)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1983)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1033)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1033)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1033)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2223)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2172)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2161)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at is.hail.backend.spark.SparkBackend.parallelizeAndComputeWithIndex(SparkBackend.scala:286)
	at is.hail.backend.BackendUtils.collectDArray(BackendUtils.scala:28)
	at __C339Compiled.__m340split_ToArray(Emit.scala)
	at __C339Compiled.apply(Emit.scala)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$1.apply$mcJ$sp(CompileAndEvaluate.scala:68)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$1.apply(CompileAndEvaluate.scala:68)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$1.apply(CompileAndEvaluate.scala:68)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:68)
	at is.hail.expr.ir.CompileAndEvaluate$.evalToIR(CompileAndEvaluate.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.is$hail$expr$ir$LowerOrInterpretNonCompilable$$rewrite$1(LowerOrInterpretNonCompilable.scala:67)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.is$hail$expr$ir$LowerOrInterpretNonCompilable$$rewrite$1(LowerOrInterpretNonCompilable.scala:53)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:16)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:16)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:14)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:14)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:64)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:15)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:13)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:381)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:417)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:414)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:46)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:46)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:275)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:414)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:413)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:413)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)



Hail version: 0.2.78-b17627756568
Error summary: SparkException: Job aborted due to stage failure: Total size of serialized results of 31 tasks (1033.5 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

I am completely out of my depth when it comes to Spark, but a bit of googling led me to this page. Based on that information, I included the following lines into my Spark initialization:

conf = (pyspark.SparkConf()
    .set("spark.driver.maxResultSize", "2g"))
sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)

I was pretty proud of myself, until I re-ran this and got the same error, except with 2 GB instead of 1024 MB (below for reference). I take from this that Hail probably uses the Spark initialization parameters to determine how much memory to use.

I’ll try to cut down the size of the matrixtable (which is a huge pain because I am manually running the code in a Jupyter notebook, and refactoring seems like even more of a pain), but otherwise, suggestions very much appreciated!

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-9-c356980b32d4> in <module>
----> 1 mt.checkpoint(f'dnax://{my_database}/CASR/phewas/jeremy_mt_1a_phenotypes.mt/' class="ansi-blue-fg">, overwrite=True)

<decorator-gen-1273> in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)

/opt/conda/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper
...

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     29                 raise FatalError('%s\n\nJava stack trace:\n%s\n'
     30                                  'Hail version: %s\n'
---> 31                                  'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
     32         except pyspark.sql.utils.CapturedException as e:
     33             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Total size of serialized results of 61 tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 61 tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2001)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1984)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1983)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1983)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1033)
	...
is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:46)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:46)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:275)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:414)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:413)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:413)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)



Hail version: 0.2.78-b17627756568
Error summary: SparkException: Job aborted due to stage failure: Total size of serialized results of 61 tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
[root] ERROR: Exception while sending command.
Traceback (most recent call last):
  File "/cluster/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/cluster/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/cluster/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-14-c356980b32d4> in <module>
----> 1 mt.checkpoint(f'dnax://{my_database}/CASR/phewas/jeremy_mt_1a_phenotypes.mt/' class="ansi-blue-fg">, overwrite=True)

<decorator-gen-1273> in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)

/opt/conda/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.6/site-packages/hail/matrixtable.py in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)
   2500 }"""
   2501         if not _read_if_exists or not hl.hadoop_exists(f'{output}/_SUCCESS'):
-> 2502             self.write(output=output, overwrite=overwrite, stage_locally=stage_locally, _codec_spec=_codec_spec)
   2503         return hl.read_matrix_table(output, _intervals=_intervals, _filter_intervals=_filter_intervals,
   2504                                     _drop_cols=_drop_cols, _drop_rows=_drop_rows)

<decorator-gen-1275> in write(self, output, overwrite, stage_locally, _codec_spec, _partitions, _checkpoint_file)

/opt/conda/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.6/site-packages/hail/matrixtable.py in write(self, output, overwrite, stage_locally, _codec_spec, _partitions, _checkpoint_file)
   2542 
   2543         writer = ir.MatrixNativeWriter(output, overwrite, stage_locally, _codec_spec, _partitions, _partitions_type, _checkpoint_file)
-> 2544         Env.backend().execute(ir.MatrixWrite(self._mir, writer))
   2545 
   2546     class _Show:

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     84         # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     85         try:
---> 86             result_tuple = self._jhc.backend().executeEncode(jir, stream_codec)
     87             (result, timings) = (result_tuple._1(), result_tuple._2())
     88             value = ir.typ._from_encoding(result)

/cluster/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     13         import pyspark
     14         try:
---> 15             return f(*args, **kwargs)
     16         except py4j.protocol.Py4JJavaError as e:
     17             s = e.java_exception.toString()

/cluster/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    334             raise Py4JError(
    335                 "An error occurred while calling {0}{1}{2}".
--> 336                 format(target_id, ".", name))
    337     else:
    338         type = answer[1]

Py4JError: An error occurred while calling o158.executeEncode

UPDATE… I tried upping the Spark memory to 30 GB, and upon execution, I got a different error:

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-16-18501a880c3f> in <module>
----> 1 mt.checkpoint(f'dnax://{my_database}/CASR/phewas/jeremy_mt_300_500_1_phenotypes.mt/' class="ansi-blue-fg">, overwrite=True)

<decorator-gen-1273> in checkpoint(self, output, overwrite, stage_locally, _codec_spec, _read_if_exists, _intervals, _filter_intervals, _drop_cols, _drop_rows)

/opt/conda/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper
...

/opt/conda/lib/python3.6/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     29                 raise FatalError('%s\n\nJava stack trace:\n%s\n'
     30                                  'Hail version: %s\n'
---> 31                                  'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
     32         except pyspark.sql.utils.CapturedException as e:
     33             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: OutOfMemoryError: null

Java stack trace:
java.lang.OutOfMemoryError: null
	at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
	at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
	at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
	at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
	at is.hail.io.StreamBlockOutputBuffer.writeBlock(OutputBuffers.scala:293)
	at is.hail.io.BlockingOutputBuffer.writeBlock(OutputBuffers.scala:190)
	at is.hail.io.BlockingOutputBuffer.flush(OutputBuffers.scala:195)
	at is.hail.io.BlockingOutputBuffer.close(OutputBuffers.scala:200)
	at is.hail.io.CompiledEncoder.close(Encoder.scala:27)
	at is.hail.utils.package$.using(package.scala:658)
	at is.hail.io.AbstractTypedCodecSpec$class.encode(CodecSpec.scala:37)
	at is.hail.io.TypedCodecSpec.encode(TypedCodecSpec.scala:19)
	at is.hail.expr.ir.EncodedLiteral$.fromPTypeAndAddress(IR.scala:100)
	at is.hail.expr.ir.CompileAndEvaluate$.evalToIR(CompileAndEvaluate.scala:37)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.is$hail$expr$ir$LowerOrInterpretNonCompilable$$rewrite$1(LowerOrInterpretNonCompilable.scala:67)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.is$hail$expr$ir$LowerOrInterpretNonCompilable$$rewrite$1(LowerOrInterpretNonCompilable.scala:53)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:16)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:16)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:14)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:14)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:64)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:15)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:13)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:381)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:417)
	at is.hail.backend.spark.SparkBackend$$anonfun$8$$anonfun$apply$4.apply(SparkBackend.scala:414)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1$$anonfun$apply$1.apply(ExecuteContext.scala:47)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:47)
	at is.hail.backend.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:46)
	at is.hail.utils.package$.using(package.scala:638)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:46)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:275)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:414)
	at is.hail.backend.spark.SparkBackend$$anonfun$8.apply(SparkBackend.scala:413)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:413)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)



Hail version: 0.2.78-b17627756568
Error summary: OutOfMemoryError: null

So I guess there is some memory error. I wondered whether my tables/matrixtables are partitioned appropriately (I am using a table with 450,000 rows and ~1500 row struct fields, each with ~5 sub-fields, to annotate a matrixtable of ~1000 variants x 450,000 samples that already has a large number, I’d guess about 40 each, of both row and column fields).

It looks like the matrixtable had 22 partitions, and I’m not sure how many partitions the table had (I was loading tables of different sizes, and I didn’t keep track). I’ve now reloaded the matrixtable (using _n_partitions=2000 in hl.read_matrix_table()) and it has 319 partitions. I also tried to boost the number of partitions of the table, but that failed, seemingly because of a bug (hl.read_table complained about an empty array when I passed it _n_partitions); it has ~700 partitions.

After the repartitioning, the job still fails due to some set of the errors above.

Ultimately, perhaps the solution from this post is what I ought to try, or alternatively, I could try to serialize my code using some complicated exec() statement.

UPDATE 2 I have refactored the code using an exec() statement, and it seems to be working!! Definitely not a long-term solution, but ok for now.

Best,
Jeremy

Great if things seem to be working, but to address the core issue we might need to look at the code to run the phewas. I’m assuming it’s something like:

  1. annotate cols with phenotype values
  2. run linear_regression_rows on all phenotypes against genotype

Hail currently localizes all column data in memory, which is good for performance until you start doing things like phewas, when it can easily OOM. In examples like the UKB rapid GWAS in the Neale lab, the most productive workflow has been to annotate phenotypes, run linear regressions, and export results files in batches of ~100-200 phenotypes at a time. It’s also true that at some number of phenotypes, this is more efficient anyway due to better cache utilization in the performance-critical GWAS inner loop.

Thank you Tim! I will give this a shot and report back.

Best,
Jeremy

Hi Tim et al—so I think I have tried to do what you’re suggesting, which is to annotate the columns of a matrixtable with a chunk of the phenotypes at a time, and then run a set of regressions, and then re-load the matrixtable and annotate with another chunk of phenotypes, etc. The main issue now is that it’s super slow. Any suggestions?

One thing that had come up previously is that there’s no way to quickly run logistic regression on phenotypes that all have different missingness patterns (see this post). So, that’s why I’m currently just running them one-by-one.

Here’s the (somewhat simplified) code:

This chunk (thanks to Dan from this post) runs every ~40 phenotypes (I use hl.import_table() to import ~40 phenotypes at a time)

phecodes = ht.aggregate(hl.agg.collect_as_set(ht.phecode))
ht = ht.group_by(
    ht.sample_id
).aggregate(
    phenos = hl.dict(hl.agg.collect((ht.phecode, ht.row)))
)
ht = ht.annotate(**{
    phecode: ht.phenos.get(phecode)
    for phecode in phecodes
})
mt = mt.annotate_cols(**ht[mt.col_key])

Then, for each of the 40 phenotypes, I run the following code:

regression_results = hl.logistic_regression_rows( 
    test = 'firth',
    y=mt_burden[f'{phenotype}'].has_phenotype,
    x=mt_burden.n_variants,
    covariates=[1.0,
                mt_burden.sex_float, 
                mt_burden.p21003_i0_squared,
                mt_burden.p21003_i0_float,
                mt_burden.p22009_a1,
                mt_burden.p22009_a2,
                mt_burden.p22009_a3,
                mt_burden.p22009_a4,
                mt_burden.p22009_a5,
                mt_burden.p22009_a6,
                mt_burden.p22009_a7,
                mt_burden.p22009_a8,
                mt_burden.p22009_a9,
                mt_burden.p22009_a10])

Best,
Jeremy