Fail to export hl.table as a tsv file

I use Hail to perform QC on summary statistics. After some QC steps, I can print the first few rows of the result table, but when I try to export the hl.table as a tsv file, it stuck in a stage and throw me an error after some time:

sumstats = hl.import_table(xxx)
...
sumstats = sumstats.annotate(xxx)
sumstats = sumstats.filter(xxx)

# var_list is another hail table that contains variants I want to intersect with the summary statistics
var_list = var_list.annotate(**sumstats[var_list.locus])
var_list = var_list.annotate(xxx)
var_list = var_list.filter(xxx)

#select some fields to export as tsv file
sumstats_QCed = var_list.select(
                        rsid = var_list.rsid,
                        alleles1 = var_list.alleles[0],
                        alleles2 = var_list.alleles[1],
                        xxx)


# this will work
sumstats_QCed.show(5)

# this will fail
sumstats_QCed.export(f"{bucket}/res.tsv")

The error message looks like this

FatalError: SparkException: Job aborted due to stage failure: ResultStage 9 (collect at SparkBackend.scala:355) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Failed to connect to all-of-us-2936-sw-hcm9.c.terra-vpc-sc-fd39b54c.internal:7337 

I really appreciate your help!
Taotao

Try using only primary (non-preemptible) workers in your cluster. Spark shuffles are not robust to preemptible VMs.

Thanks Tim. It seems I am getting the same error message even using regular workers:

2023-03-22 01:22:18.785 Hail: INFO: wrote table with 1949630 rows in 1 partition to /tmp/persist_table6EMWdcwEsQ
2023-03-22 01:22:20.929 Hail: INFO: Reading table to impute column types 1) / 1]
2023-03-22 01:23:28.577 Hail: INFO: Loading <StructExpression of type struct{chr: str, pos: str, ref: str, alt: str, af_meta_hq: float64, beta_meta_hq: float64, se_meta_hq: float64, neglog10_pval_meta_hq: float64, neglog10_pval_heterogeneity_hq: float64, af_meta: float64, beta_meta: float64, se_meta: float64, neglog10_pval_meta: float64, neglog10_pval_heterogeneity: float64, af_AFR: float64, af_AMR: float64, af_CSA: float64, af_EAS: float64, af_EUR: float64, beta_AFR: float64, beta_AMR: float64, beta_CSA: float64, beta_EAS: float64, beta_EUR: float64, se_AFR: float64, se_AMR: float64, se_CSA: float64, se_EAS: float64, se_EUR: float64, neglog10_pval_AFR: float64, neglog10_pval_AMR: float64, neglog10_pval_CSA: float64, neglog10_pval_EAS: float64, neglog10_pval_EUR: float64, low_confidence_AFR: bool, low_confidence_AMR: bool, low_confidence_CSA: bool, low_confidence_EAS: bool, low_confidence_EUR: bool}> fields. Counts by type:
  float64: 30
  bool: 5
  str: 4
2023-03-22 01:25:15.065 Hail: INFO: Ordering unsorted dataset with network shuffle
2023-03-22 01:27:00.788 Hail: INFO: Ordering unsorted dataset with network shuffle
[Stage 9:>                                                          (0 + 1) / 1]
---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
/tmp/ipykernel_487/220662931.py in <module>
      4 sumstats_QC(f"{bucket}/Sumstats/continuous-50-both_sexes-irnt.tsv.bgz",
      5             f"{bucket}/Sumstats/Array_Height_QCed.tsv",
----> 6             var_array)
      7 
      8 print("DBP " + str(datetime.now()))

/tmp/ipykernel_487/483972014.py in sumstats_QC(filename_in, filename_out, var_array)
     69                         neglog10_pval_meta_hq = var_array.neglog10_pval_meta_hq)
     70 
---> 71     sumstats_QCed.export(filename_out)

<decorator-gen-1190> in export(self, output, types_file, header, parallel, delimiter)

/opt/conda/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.7/site-packages/hail/table.py in export(self, output, types_file, header, parallel, delimiter)
   1097         parallel = ir.ExportType.default(parallel)
   1098         Env.backend().execute(
-> 1099             ir.TableWrite(self._tir, ir.TableTextWriter(output, types_file, header, parallel, delimiter)))
   1100 
   1101     def group_by(self, *exprs, **named_exprs) -> 'GroupedTable':

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
    103             return (value, timings) if timed else value
    104         except FatalError as e:
--> 105             raise e.maybe_user_error(ir) from None
    106 
    107     async def _async_execute(self, ir, timed=False):

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     97         # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     98         try:
---> 99             result_tuple = self._jbackend.executeEncode(jir, stream_codec, timed)
    100             (result, timings) = (result_tuple._1(), result_tuple._2())
    101             value = ir.typ._from_encoding(result)

/opt/conda/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1321         answer = self.gateway_client.send_command(command)
   1322         return_value = get_return_value(
-> 1323             answer, self.gateway_client, self.target_id, self.name)
   1324 
   1325         for temp_arg in temp_args:

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     29             tpl = Env.jutils().handleForPython(e.java_exception)
     30             deepest, full, error_id = tpl._1(), tpl._2(), tpl._3()
---> 31             raise fatal_error_from_java_error_triplet(deepest, full, error_id) from None
     32         except pyspark.sql.utils.CapturedException as e:
     33             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 82) (all-of-us-2936-w-0.c.terra-vpc-sc-fd39b54c.internal executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122615 ms
Driver stacktrace:

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 82) (all-of-us-2936-w-0.c.terra-vpc-sc-fd39b54c.internal executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122615 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at is.hail.backend.spark.SparkBackend.parallelizeAndComputeWithIndex(SparkBackend.scala:355)
	at is.hail.backend.BackendUtils.collectDArray(BackendUtils.scala:43)
	at __C1874Compiled.apply(Emit.scala)
	at is.hail.expr.ir.CompileAndEvaluate$.$anonfun$_apply$3(CompileAndEvaluate.scala:57)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:57)
	at is.hail.expr.ir.CompileAndEvaluate$.evalToIR(CompileAndEvaluate.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:67)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:14)
	at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:64)
	at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:15)
	at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:13)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
	at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:450)
	at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$2(SparkBackend.scala:486)
	at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:70)
	at is.hail.utils.package$.using(package.scala:635)
	at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:70)
	at is.hail.utils.package$.using(package.scala:635)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:59)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:339)
	at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$1(SparkBackend.scala:483)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:482)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)



Hail version: 0.2.107-2387bb00ceee
Error summary: SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 4 times, most recent failure: Lost task 0.3 in stage 9.0 (TID 82) (all-of-us-2936-w-0.c.terra-vpc-sc-fd39b54c.internal executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 122615 ms
Driver stacktrace:

This new error often indicates that the machines are running out of memory. I generally recommend writing to compressed text: “.tsv.bgz”. Hail will automatically use compression if you specify that file extension.

It’s also possible this pipeline will benefit from isolating import_table from the main work. You might try a x = x.checkpoint(“gs://…”) before the point where you annotate using the imported sum stats table.

Generally though, im not sure what’s going on. It’s surprising to me that you’d run out of memory on a simple pipeline but maybe those ellipses hide operations that would trigger high memory use?

Thanks so much @danking. Indeed this resolved the problem! I guess that annotating two tables (which is essentially a left join) is a memory-hungry step.

Thanks again for the suggestions

I just noticed that you’re running in Terra. Terra has an existing bug where the cluster driver allocates far to little memory to the Hail java process, which is quite likely the problem you’re hitting here. I’ve just raised this again with the Terra dev team and will report back.

I was using Hail in AllofUs platform. Would it be helpful that I contact the AllofUs team to let them know about this bug?

Yes, that would be great.

(I think it will end up in the same place, because AoU is a fork of Terra, but more noise will get this fixed faster!)

Do you think this bug can affect the computation speed of Hail in general?