Hl.maximal_independent_set - job 'cancelled because SparkContext was shut down'

I would agree they are a little difficult to debug! I suspect it is running out of memory perhaps, from reading online. I have uploaded the log file here.

Hi, I was wondering if you had any updates regarding this. The same problem is ongoing. Kind regards, Sam

Hi, just following up again. I would really like to be able to use your PC-Relate implementation on our local cluster. This is proving tricky due to a variety of ‘heartbeat’-type errors, I think it is running out of memory (there is ~3TB on this node). From what I can see the command seems to put a massive amount of data >1-2TB into RDD storage in memory. Is there any way to avoid this, and for example to instead put this into temporary SSD storage?

Hey @samkleeman1 , sorry for the delay.

The relevant exception is this one:

java.lang.OutOfMemoryError: Java heap space
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:141)
	at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:139)
	at breeze.linalg.DenseMatrix$.zeros$mDc$sp(DenseMatrix.scala:345)
	at is.hail.linalg.BlockMatrixMultiplyRDD.compute(BlockMatrix.scala:1860)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

You’re running out of memory during the matrix multiply. We usually assume 3.75 GB of RAM per core. The pc_relate function takes a block_size parameter which controls memory usage during the matrix multiply. This defaults to 4096 which is roughly 134 MB. We assume three of those can be fit in memory at any given time. Unfortunately, in our experience, Spark is already using so much memory that larger blocks blow memory. I suggest trying 2048 or 1024.

We do use Spark’s cache functionality to keep the matrix blocks in memory as much as possible, but that should evict blocks when memory pressure gets too high.

Okay many thanks for the update. I am trying to run the script below. I have pre-computed the PCA vectors using PLINK2 approx function to save computational time. I am still unclear whether I need to set executor memory on a local instance?

import hail as hl
import os
import pandas as pd
import subprocess

#Define memory and CPU availability

tmp = "/mnt/grid/janowitz/rdata_norepl/tmp"

os.environ["SPARK_LOCAL_DIRS"]=tmp
os.environ["PYSPARK_SUBMIT_ARGS"] ="--conf spark.network.timeout=5m --conf spark.executor.heartbeatInterval=1m --conf spark.memory.fraction=1.0 --driver-memory 2880g --executor-memory 5g pyspark-shell"

hl.init(default_reference='GRCh38', master ='local[96]',min_block_size=128, local_tmpdir=tmp, tmp_dir=tmp)

ukb_gwas = hl.read_matrix_table('/mnt/grid/ukbiobank/data/ApplicationXXXXX/skleeman/ukb_grch38_pruned.mt')

scores = hl.import_table('/mnt/grid/janowitz/home/skleeman/ukbiobank/cancergwas/relatedness/pca_all.eigenvec', impute=True, types={'IID': hl.tstr})
scores = scores.key_by('IID')
scores = scores.transmute(scores=[scores.PC1, scores.PC2, scores.PC3, scores.PC4, scores.PC5, scores.PC6, scores.PC7, scores.PC8, scores.PC9, scores.PC10])

ukb_gwas = ukb_gwas.repartition(1000)

relatedness_ht = hl.pc_relate(ukb_gwas.GT, min_individual_maf=0.05, scores_expr=scores[ukb_gwas.col_key].scores,
                                      min_kinship=0.05, statistics='kin', block_size = 1024)

relatedness_ht.write("/mnt/grid/ukbiobank/data/ApplicationXXXX/skleeman/relatedness_ukb.ht", overwrite=True)

Ah, sorry, I didn’t realize you were running on a single machine. Are you already setting PYSPARK_SUBMIT_ARGS?

Yes the command is listed above, copied below

os.environ["PYSPARK_SUBMIT_ARGS"] ="--conf spark.network.timeout=5m --conf spark.executor.heartbeatInterval=1m --conf spark.memory.fraction=1.0 --driver-memory 2880g --executor-memory 5g pyspark-shell"

Hmm, I haven’t tried setting it in the python script, but if it seems to work for you then let’s stick with it.

How many executors do you have? Setting that to 5g seems really low. How many cores does each executor have? I’d reduce the driver memory dramatically. There’s not a lot of work that happens in driver, so I’d put that at like 30g?

This is being run on a node with 96 cores and 3TB of RAM. So I guess this means I have 96 executors each with one core or 1 executor with 96 cores, I am unsure to be honest? I have found that altering driver memory seems to determine the amount of memory available in the Spark executors page, see attached photo. If I reduce it then the job will fail through not enough memory. I am not sure exactly the best way to run this in the local environment and would be very grateful for any guidance on this.

Nobody is ever sure with spark :wink: . Hmm. It looks like you just have a driver and no executors? In that case, it would seem that you’ve already supplied all available memory to the job. Can you share the hail log file?

Nothing seems unusual about this to me. You should have lots of excess memory available.

Sadly this script has failed, error copied below, I am seeing this repeatedly when I run this task. Full log file available here (hail-20210122-1117-0.2.61-291a63b97bd9.log - Google Drive). Really grateful for any assistance with this, would love to get this to work.

LOGGING: writing to /mnt/grid/janowitz/home/skleeman/ukbiobank/cancergwas/relatedness/hail-20210122-1117-0.2.61-291a63b97bd9.log
2021-01-22 11:18:04 Hail: INFO: Reading table to impute column types
2021-01-22 11:18:08 Hail: INFO: Finished type imputation
  Loading field '#FID' as type int32 (imputed)
  Loading field 'IID' as type str (user-supplied type)
  Loading field 'PC1' as type float64 (imputed)
  Loading field 'PC2' as type float64 (imputed)
  Loading field 'PC3' as type float64 (imputed)
  Loading field 'PC4' as type float64 (imputed)
  Loading field 'PC5' as type float64 (imputed)
  Loading field 'PC6' as type float64 (imputed)
  Loading field 'PC7' as type float64 (imputed)
  Loading field 'PC8' as type float64 (imputed)
  Loading field 'PC9' as type float64 (imputed)
  Loading field 'PC10' as type float64 (imputed)
2021-01-22 15:18:33 Hail: INFO: Wrote all 125928 blocks of 269832 x 488377 matrix with block size 1024.
2021-01-22 18:27:25 Hail: INFO: wrote matrix with 11 rows and 269832 columns as 264 blocks of size 1024 to /mnt/grid/janowitz/rdata_norepl/tmp/pcrelate-write-read-IqUNZs2sxvRN4kr5Ah3UJK.bm
Traceback (most recent call last):
  File "relate.py", line 26, in <module>
    relatedness_ht.write("/mnt/grid/ukbiobank/data/Application58510/skleeman/relatedness_ukb.ht", overwrite=True)
  File "<decorator-gen-1095>", line 2, in write
  File "/grid/wsbs/home_norepl/skleeman/hail/hail/python/hail/typecheck/check.py", line 614, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/grid/wsbs/home_norepl/skleeman/hail/hail/python/hail/table.py", line 1271, in write
    Env.backend().execute(ir.TableWrite(self._tir, ir.TableNativeWriter(output, overwrite, stage_locally, _codec_spec)))
  File "/grid/wsbs/home_norepl/skleeman/hail/hail/python/hail/backend/py4j_backend.py", line 98, in execute
    raise e
  File "/grid/wsbs/home_norepl/skleeman/hail/hail/python/hail/backend/py4j_backend.py", line 74, in execute
    result = json.loads(self._jhc.backend().executeJSON(jir))
  File "/grid/wsbs/home_norepl/skleeman/.local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/grid/wsbs/home_norepl/skleeman/hail/hail/python/hail/backend/py4j_backend.py", line 32, in deco
    'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
hail.utils.java.FatalError: SparkException: Job aborted due to stage failure: Task 27483 in stage 11.0 failed 1 times, most recent failure: Lost task 27483.0 in stage 11.0 (TID 30330, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 800992 ms
Driver stacktrace:

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 27483 in stage 11.0 failed 1 times, most recent failure: Lost task 27483.0 in stage 11.0 (TID 30330, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 800992 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:176)
	at is.hail.utils.richUtils.RichContextRDD.writePartitions(RichContextRDD.scala:112)
	at is.hail.utils.richUtils.RichRDD$.writePartitions$extension(RichRDD.scala:204)
	at is.hail.linalg.BlockMatrix.write(BlockMatrix.scala:872)
	at is.hail.methods.PCRelate.writeRead(PCRelate.scala:159)
	at is.hail.methods.PCRelate.mu(PCRelate.scala:217)
	at is.hail.methods.PCRelate.computeResult(PCRelate.scala:173)
	at is.hail.methods.PCRelate.execute(PCRelate.scala:146)
	at is.hail.expr.ir.BlockMatrixToTableApply.execute(TableIR.scala:2786)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1846)
	at is.hail.expr.ir.TableFilter.execute(TableIR.scala:1280)
	at is.hail.expr.ir.TableKeyBy.execute(TableIR.scala:1210)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1846)
	at is.hail.expr.ir.TableKeyBy.execute(TableIR.scala:1210)
	at is.hail.expr.ir.Interpret$.run(Interpret.scala:825)
	at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:53)
	at is.hail.expr.ir.InterpretNonCompilable$.interpretAndCoerce$1(InterpretNonCompilable.scala:16)
	at is.hail.expr.ir.InterpretNonCompilable$.is$hail$expr$ir$InterpretNonCompilable$$rewrite$1(InterpretNonCompilable.scala:53)
	at is.hail.expr.ir.InterpretNonCompilable$.apply(InterpretNonCompilable.scala:58)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.transform(LoweringPass.scala:67)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:15)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:15)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:15)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:13)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.apply(LoweringPass.scala:62)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:14)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:12)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:12)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:28)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:360)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:344)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:341)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:25)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:23)
	at is.hail.utils.package$.using(package.scala:618)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:12)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:23)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:254)
	at is.hail.backend.spark.SparkBackend.execute(SparkBackend.scala:341)
	at is.hail.backend.spark.SparkBackend$$anonfun$7.apply(SparkBackend.scala:385)
	at is.hail.backend.spark.SparkBackend$$anonfun$7.apply(SparkBackend.scala:383)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeJSON(SparkBackend.scala:383)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



Hail version: 0.2.61-291a63b97bd9
Error summary: SparkException: Job aborted due to stage failure: Task 27483 in stage 11.0 failed 1 times, most recent failure: Lost task 27483.0 in stage 11.0 (TID 30330, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 800992 ms
Driver stacktrace:

Huh, I have no explanation. It looks like Spark decides that some executor has failed and then promptly ceases doing any further work at all.

Is the file system at /mnt/grid/janowitz/rdata_norepl/tmp/ running out of space?

It is pretty strange…! There should be 100TB of space available

If you have any ideas let me know, I have tried running it again several times with different block size and get the same error every time

This is a known issue, though I have no idea what is causing it. I also see this while running benchmarks in local mode in a docker container (either on my mac or on the cloud)

Hi,

Grateful for you looking in this. Is there any way to circumvent this issue? I am fairly keen to avoid running this command on Google Cloud as it will be pretty expensive…

Sam

Since I don’t know the root cause, I can’t suggest a workaround right now. Every time I’ve seen this error mode, though, it’s been in a pipeline that uses Breeze (scala linear algebra library) functionality, so it could be a problem with native code.

Can I run Hail from a docker container?

Yes, but I’ve also seen this issue appear in docker runtimes.

I should say that I have got the pc_relate function to work on a much smaller dataset (n=3,000 vs n=500,000), so it seems to be a sample size specific problem

Sam