Write Bgen Fatal Error

I produced a matrix table, successfully split multiples, then tried to write this to bgen. I got the following error, and not sure what to do. The error does not show up if I do not split it. It also does not show up if I operate on a smaller interval (a single gene filtered using filter_intervals), so it is perhaps memory-related, so using the “parallel” option could help. I would appreciate guidance.

My code:

mt = hl.read_matrix_table(f'{my_bucket}/data/hail_{curr_chr}.mt')
mt_plink = hl.split_multi_hts(mt)
gp_values = hl.literal([[1.0, 0.0, 0.0], [0.0, 1.0, 0.0], [0.0, 0.0, 1.0]])
hl.export_bgen(mt_plink, out_bgen, gp=gp_values[mt_plink.GT.n_alt_alleles()])
FatalError                                Traceback (most recent call last)
/tmp/ipykernel_237/3139653622.py in <module>
      1 # faster step
      2 str(datetime.now())
----> 3 hl.export_bgen(mt_plink, out_bgen, gp=gp_values[mt_plink.GT.n_alt_alleles()])

<decorator-gen-1508> in export_bgen(mt, output, gp, varid, rsid, parallel)

/opt/conda/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.7/site-packages/hail/methods/impex.py in export_bgen(mt, output, gp, varid, rsid, parallel)
    245     Env.backend().execute(ir.MatrixWrite(mt._mir, ir.MatrixBGENWriter(
    246         output,
--> 247         parallel)))
    248 
    249 

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
    103             return (value, timings) if timed else value
    104         except FatalError as e:
--> 105             raise e.maybe_user_error(ir) from None
    106 
    107     async def _async_execute(self, ir, timed=False):

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     97         # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     98         try:
---> 99             result_tuple = self._jbackend.executeEncode(jir, stream_codec, timed)
    100             (result, timings) = (result_tuple._1(), result_tuple._2())
    101             value = ir.typ._from_encoding(result)

/opt/conda/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1321         answer = self.gateway_client.send_command(command)
   1322         return_value = get_return_value(
-> 1323             answer, self.gateway_client, self.target_id, self.name)
   1324 
   1325         for temp_arg in temp_args:

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     29             tpl = Env.jutils().handleForPython(e.java_exception)
     30             deepest, full, error_id = tpl._1(), tpl._2(), tpl._3()
---> 31             raise fatal_error_from_java_error_triplet(deepest, full, error_id) from None
     32         except pyspark.sql.utils.CapturedException as e:
     33             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: RemoteException: File /tmp/export-bgen-concatenated-LpLH7K29SkI8krTADdGvj7/part-0300-ee8d1e84-ecdb-45c0-a499-abfe13ac0704 could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation.
	at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260)
	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)


Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 300 in stage 1.0 failed 4 times, most recent failure: Lost task 300.3 in stage 1.0 (TID 2322) (all-of-us-6636-sw-1zlp.c.terra-vpc-sc-e41b7259.internal executor 370): org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/export-bgen-concatenated-LpLH7K29SkI8krTADdGvj7/part-0300-ee8d1e84-ecdb-45c0-a499-abfe13ac0704 could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation.
	at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260)
	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1589)
	at org.apache.hadoop.ipc.Client.call(Client.java:1535)
	at org.apache.hadoop.ipc.Client.call(Client.java:1432)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
	at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:520)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy20.addBlock(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1082)
	at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1898)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1700)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at is.hail.backend.spark.SparkBackend.parallelizeAndComputeWithIndex(SparkBackend.scala:355)
	at is.hail.backend.BackendUtils.collectDArray(BackendUtils.scala:43)
	at __C297Compiled.__m338split_CollectDistributedArray(Emit.scala)
	at __C297Compiled.__m337begin_group_0(Emit.scala)
	at __C297Compiled.apply(Emit.scala)
	at is.hail.expr.ir.CompileAndEvaluate$.$anonfun$_apply$3(CompileAndEvaluate.scala:57)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:57)
	at is.hail.expr.ir.CompileAndEvaluate$.evalToIR(CompileAndEvaluate.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:30)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:67)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:14)
	at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:64)
	at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:15)
	at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:13)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:13)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:47)
	at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:450)
	at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$2(SparkBackend.scala:486)
	at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:70)
	at is.hail.utils.package$.using(package.scala:635)
	at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:70)
	at is.hail.utils.package$.using(package.scala:635)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:59)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:339)
	at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$1(SparkBackend.scala:483)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:482)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)

org.apache.hadoop.ipc.RemoteException: File /tmp/export-bgen-concatenated-LpLH7K29SkI8krTADdGvj7/part-0300-ee8d1e84-ecdb-45c0-a499-abfe13ac0704 could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation.
	at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260)
	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)

	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1589)
	at org.apache.hadoop.ipc.Client.call(Client.java:1535)
	at org.apache.hadoop.ipc.Client.call(Client.java:1432)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118)
	at com.sun.proxy.$Proxy19.addBlock(Unknown Source)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:520)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:422)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:165)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:157)
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:359)
	at com.sun.proxy.$Proxy20.addBlock(Unknown Source)
	at org.apache.hadoop.hdfs.DFSOutputStream.addBlock(DFSOutputStream.java:1082)
	at org.apache.hadoop.hdfs.DataStreamer.locateFollowingBlock(DataStreamer.java:1898)
	at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1700)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:707)




Hail version: 0.2.107-2387bb00ceee
Error summary: RemoteException: File /tmp/export-bgen-concatenated-LpLH7K29SkI8krTADdGvj7/part-0300-ee8d1e84-ecdb-45c0-a499-abfe13ac0704 could only be written to 0 of the 1 minReplication nodes. There are 2 datanode(s) running and 2 node(s) are excluded in this operation.
	at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:2260)
	at org.apache.hadoop.hdfs.server.namenode.FSDirWriteFileOp.chooseTargetForNewBlock(FSDirWriteFileOp.java:294)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:2813)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:908)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:577)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:549)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:518)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1086)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1029)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:957)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2957)

As I am also focused on reducing costs, will writing an intermediate matrix table not be worth it when I am ultimately seeking to produce a bgen file?

The issue resolved when I used parallel = 'header_per_shard'.

I would still value others’ advice on whether an intermediate matrix table write (then reading from this and working on it) is worth it considering the extra computational time, especially when working on a massive matrix table.

This error (lightly reformatted for legibility):

org.apache.spark.SparkException:
 Job aborted due to stage failure:
   Task 300 in stage 1.0 failed 4 times, most recent failure:
     Lost task 300.3 in stage 1.0 (TID 2322) (all-of-us-6636-sw-1zlp.c.terra-vpc-sc-e41b7259.internal executor 370):
       org.apache.hadoop.ipc.RemoteException(java.io.IOException):
         File /tmp/export-bgen-concatenated-LpLH7K29SkI8krTADdGvj7/part-0300-ee8d1e84-ecdb-45c0-a499-abfe13ac0704
         could only be written to 0 of the 1 minReplication nodes. There are 2 
         datanode(s) running and 2 node(s) are excluded in this operation.

Indicates that you are writing to HDFS (HadoopFileSystem). HDFS is a networked filesystem that is only serviced by the non-preemptible / non-spot instances in your cluster. When your cluster primarily consists of preemptible / spot instances (as it should, these are one-fifth the cost and Hail is designed to use them), HDFS becomes effectively unusable. You have two choices:

  1. Use more non-preemptible / non-spot instances (at 5x the cost) to make HDFS more reliable.
  2. Don’t use HDFS.

I strongly recommend the later. S3 / GCS / ABFS are more reliable, scalable, and transcend the lifetime of a cluster. In particular, it looks like you must have your temporary directory set to /tmp. A very confusing aspect of Spark and Hadoop clusters is that /foo refers to /foo in HDFS. If you want to refer to the local filesystem of an instance you have to write file:/foo. The simple fix here is this:

hl.init(tmp_dir='gs://my-7-day-bucket/')

Where my-7-day-bucket is a bucket with an “Object Lifecycle Management policy”. For example, Hail uses this 7 day lifecycle policy:

  "lifecycle": {
    "rule": [
      {
        "action": {
          "type": "Delete"
        },
        "condition": {
          "age": 7
        }
      }
    ]
  }

This empowers you to use this bucket for temporary storage knowing that whatever data is stored there will be deleted after seven days. This helps control cost. Seven days might even be too long! If all of your pipelines complete within a single day, you can use a shorter age. I recommend giving yourself a bit of buffer though!


The parallel option avoids using HDFS entirely because it only writes to the final destination. In non-parallel mode, Hail needs to write to a temporary location before reading from that temporary location to concatenate the files. Concatenation is rather slow anyway (it is a necessarily serial, single-machine operation). If you can get by with shared BGENs, I strongly recommend using that.