Getting "heartbeat timed out" error when running on slurm cluster

Hi Guys,

I’m hoping for your help to understand a SparkException I’ve been getting. I’m running an interactive ipython session on a remote node with 28cpu/250gb RAM (slurm cluster). I’ve also set my PYSPARK_SUBMIT_ARGS global variable as ‘–driver-memory 8g --executor-memory 8g pyspark-shell’. I keep receiving the following error:

In [6]: hl.export_gen(mt,‘/gpfs/milgram/data/UKB/ukb_snp/ukb_imp_chr_2_dpsn’)
[Stage 2:=============> (374 + 28) / 1505]---------------------------------------------------------------------------
FatalError Traceback (most recent call last)
in
----> 1 hl.export_gen(mt,‘/gpfs/milgram/data/UKB/ukb_snp/ukb_imp_chr_2_dpsn’)
in export_gen(dataset, output, precision, gp, id1, id2, missing, varid, rsid)
/gpfs/milgram/project/holmes/kma52/miniconda3/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
558 def wrapper(original_func, *args, **kwargs):
559 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
→ 560 return original_func(*args, **kwargs)
561
562 return wrapper
/gpfs/milgram/project/holmes/kma52/miniconda3/lib/python3.7/site-packages/hail/methods/impex.py in export_gen(dataset, output, precision, gp, id1, id2, missing, varid, rsid)
162 entry_exprs=entry_exprs)
163
→ 164 dataset._jmt.exportGen(output, precision)
165
166
/gpfs/milgram/project/holmes/kma52/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
→ 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/gpfs/milgram/project/holmes/kma52/miniconda3/lib/python3.7/site-packages/hail/utils/java.py in deco(*args, **kwargs)
208 raise FatalError(‘%s\n\nJava stack trace:\n%s\n’
209 ‘Hail version: %s\n’
→ 210 ‘Error summary: %s’ % (deepest, full, hail.version, deepest)) from None
211 except pyspark.sql.utils.CapturedException as e:
212 raise FatalError(‘%s\n\nJava stack trace:\n%s\n’
FatalError: SparkException: Job aborted due to stage failure: Task 386 in stage 2.0 failed 1 times, most recent failure: Lost task 386.0 in stage 2.0 (TID 442, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170979 ms
Driver stacktrace:
Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 386 in stage 2.0 failed 1 times, most recent failure: Lost task 386.0 in stage 2.0 (TID 442, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170979 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1533)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1521)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1520)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1520)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1748)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1703)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1692)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1151)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1096)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1070)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:961)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:960)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1489)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1468)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1468)
at is.hail.utils.richUtils.RichRDD$.writeTable$extension(RichRDD.scala:67)
at is.hail.io.gen.ExportGen$.apply(ExportGen.scala:103)
at is.hail.variant.MatrixTable$$anonfun$exportGen$1.apply(MatrixTable.scala:1247)
at is.hail.variant.MatrixTable$$anonfun$exportGen$1.apply(MatrixTable.scala:1247)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:724)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:107)
at is.hail.expr.ir.Interpret$.apply(Interpret.scala:77)
at is.hail.variant.MatrixTable.exportGen(MatrixTable.scala:1247)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Hail version: 0.2.4-82d3af919183
Error summary: SparkException: Job aborted due to stage failure: Task 386 in stage 2.0 failed 1 times, most recent failure: Lost task 386.0 in stage 2.0 (TID 442, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 170979 ms
Driver stacktrace:

Puzzlingly, the export command runs without error on a linux machine the same configuration (8cpu, 64GB RAM). It only seems to fail when submitted as a slurm job. I’m wondering if there is a tweak to my SparkConf that could help? Here is the setup:

In [9]: hl.context.SparkConf().getAll()
Out[9]:
[(‘spark.driver.extraClassPath’,
‘/gpfs/milgram/project/holmes/kma52/miniconda3/lib/python3.7/site-packages/hail/hail-all-spark.jar’),
(‘spark.executor.extraClassPath’,
‘/gpfs/milgram/project/holmes/kma52/miniconda3/lib/python3.7/site-packages/hail/hail-all-spark.jar’),
(‘spark.driver.memory’, ‘8g’),
(‘spark.master’, ‘local[*]’),
(‘spark.submit.deployMode’, ‘client’),
(‘spark.app.name’, ‘pyspark-shell’)]

Should there be a line that says (‘spark.executor.memory’,‘8g’) here?

I’ve scoured stackoverflow for a fix, but would really appreciate any thoughts/guidance!

Thanks!
Kevin

This is pretty weird - that’s an error you see when running in cluster mode and one node goes MIA. But it sounds like you’re running in local mode on this beefy node, right?

Hi Tim,
Totally right, I’m running stand-alone hail in an interactive session on a single node. No matter what I’ve tried, spark will parallelize across all available cpus (28). Could it be that the tasks running on all cpus are crowding out the driver program, or vice versa? My other thought is that there could be a i/o bottleneck caused by 28 tasks trying to read/write.

Do you have any quick tips about tweaking the configuration? Maybe how to run 14 executors with 2cpus each? It would help a ton for troubleshooting!

Thanks,
Kevin

The number of threads Spark uses in local mode is controlled by the “master” argument - local[*] will use all, local[8] will use 8, etc.

The thing that’s baffling me is why Spark is doing a heartbeat at all, given that it should only be running one java process in local mode.