Spark–Hail failure mode triggered by .write() on large MatrixTables

Hello,

I am trying to extract variants (universe) from UKB DRAGEN WGS pVCF 500k in all autosomes. First I have tried chromosome 21. I used Python notebook in Spark Cluster, feature HAIL with mem1_ssd1_v2_x16 with 50 instance nodes. When I tried to use .write(), it recalled a trashback after 1 hour:

ClassNotFoundException: is.hail.backend.spark.SparkBackend$$anon$5$RDDPartition.

Could someone help ?

This is my code:
import hail as hl
import pyspark
import os
conf = pyspark.SparkConf()
conf.set(“spark.network.timeout”, “600s”)
conf.set(“spark.executor.heartbeatInterval”, “120s”)
conf.set(“spark.driver.maxResultSize”, “0”)
sc = pyspark.SparkContext.getOrCreate(conf=conf)
try:
hl.stop()
except:
pass

hl.init(sc=sc, default_reference=‘GRCh38’, idempotent=True)
print(“— Hail successfully started on Cluster —”)
vcf_path = ‘file:///mnt/project/Bulk/WGS_500k_Clean/chr21/*.vcf.gz’

Temporary path to save MatrixTable (Helps to speed up queries)

mt_checkpoint_path = ‘file:///mnt/project/Bulk/WGS_500k_Clean/chr21/checkpoint_temp.mt’
output_tsv_path = ‘file:///mnt/project/Bulk/WGS_500k_Clean/chr21/chr21_variants.tsv’
print(“— START TO IMPORT VCF —”)

2. Import VCF with optimized parameters for UK Biobank

mt = hl.import_vcf(
vcf_path,
force_bgz=True,
reference_genome=‘GRCh38’,
array_elements_required=False,
skip_invalid_loci=True
)

3. Save to .mt (Checkpoint) format

mt.write(mt_checkpoint_path, overwrite=True)

Trashback recalled:
---------------------------------------------------------------------------FatalError Traceback (most recent call last)
Cell In[7], line 16 12 print(“— CONVERTING TO MATRIX TABLE (The most challenging step) —”) 13 # 3. Save to .mt (Checkpoint) format 14 # This step will utilize all 50 machines to run. 15 # After this step is complete, everything will be extremely fast.—> 16 mt.write(mt_checkpoint_path, overwrite=True) 18 print(“— CONVERSION SUCCESSFUL —”)

File :2, in write(self, output, overwrite, stage_locally, _codec_spec, _partitions)

File /opt/conda/lib/python3.12/site-packages/hail/typecheck/check.py:585, in _make_dec..wrapper(__original_func, *args, **kwargs) 582 @decorator 583 def wrapper(original_func: Callable[…, T], *args, **kwargs) → T: 584 args, kwargs = check_all(__original_func, args, kwargs, checkers, is_method=is_method)–> 585 return original_func(*args, **kwargs)

File /opt/conda/lib/python3.12/site-packages/hail/matrixtable.py:2810, in MatrixTable.write(self, output, overwrite, stage_locally, _codec_spec, _partitions) 2807 _partitions_type = None 2809 writer = ir.MatrixNativeWriter(output, overwrite, stage_locally, _codec_spec, _partitions, _partitions_type)-> 2810 Env.backend().execute(ir.MatrixWrite(self._mir, writer))

File /opt/conda/lib/python3.12/site-packages/hail/backend/spark_backend.py:217, in SparkBackend.execute(self, ir, timed) 214 except Exception as fatal: 215 raise err from fatal–> 217 raise err

File /opt/conda/lib/python3.12/site-packages/hail/backend/spark_backend.py:209, in SparkBackend.execute(self, ir, timed) 207 def execute(self, ir: BaseIR, timed: bool = False) → Any: 208 try:–> 209 return super().execute(ir, timed) 210 except Exception as err: 211 if self._copy_log_on_error:

File /opt/conda/lib/python3.12/site-packages/hail/backend/backend.py:181, in Backend.execute(self, ir, timed) 179 result, timings = self._rpc(ActionTag.EXECUTE, payload) 180 except FatalError as e:–> 181 raise e.maybe_user_error(ir) from None 182 if ir.typ == tvoid: 183 value = None

File /opt/conda/lib/python3.12/site-packages/hail/backend/backend.py:179, in Backend.execute(self, ir, timed) 177 payload = ExecutePayload(self._render_ir(ir), ‘{“name”:“StreamBufferSpec”}’, timed) 178 try:–> 179 result, timings = self._rpc(ActionTag.EXECUTE, payload) 180 except FatalError as e: 181 raise e.maybe_user_error(ir) from None

File /opt/conda/lib/python3.12/site-packages/hail/backend/py4j_backend.py:221, in Py4JBackend._rpc(self, action, payload) 219 if resp.status_code >= 400: 220 error_json = orjson.loads(resp.content)–> 221 raise fatal_error_from_java_error_triplet( 222 error_json[‘short’], error_json[‘expanded’], error_json[‘error_id’] 223 ) 224 return resp.content, resp.headers.get(‘X-Hail-Timings’, ‘’)FatalError: ClassNotFoundException: is.hail.backend.spark.SparkBackend$$anon$5$RDDPartition

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 27696 in stage 2.0 failed 4 times, most recent failure: Lost task 27696.3 in stage 2.0 (TID 35011) (ip-10-60-137-110.eu-west-2.compute.internal executor 80): java.lang.ClassNotFoundException: is.hail.backend.spark.SparkBackend$$anon$5$RDDPartition
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2956)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2886)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2885)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2885)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1341)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1341)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1341)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3161)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3095)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3084)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1073)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
at is.hail.backend.spark.SparkBackend.$anonfun$parallelizeAndComputeWithIndex$4(SparkBackend.scala:455)
at is.hail.backend.spark.SparkBackend.$anonfun$parallelizeAndComputeWithIndex$4$adapted(SparkBackend.scala:454)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at is.hail.backend.spark.SparkBackend.parallelizeAndComputeWithIndex(SparkBackend.scala:454)
at is.hail.backend.BackendUtils.collectDArray(BackendUtils.scala:85)
at __C5Compiled.apply(Emit.scala)
at is.hail.expr.ir.LoweredTableReader$.$anonfun$makeCoercer$15(TableIR.scala:402)
at is.hail.backend.ExecuteContext.$anonfun$scopedExecution$1(ExecuteContext.scala:144)
at is.hail.utils.package$.using(package.scala:673)
at is.hail.backend.ExecuteContext.scopedExecution(ExecuteContext.scala:144)
at is.hail.expr.ir.LoweredTableReader$.makeCoercer(TableIR.scala:401)
at is.hail.expr.ir.GenericTableValue.getLTVCoercer(GenericTableValue.scala:187)
at is.hail.expr.ir.GenericTableValue.toTableStage(GenericTableValue.scala:220)
at is.hail.io.vcf.MatrixVCFReader.lower(LoadVCF.scala:2140)
at is.hail.expr.ir.TableReader.lower(TableIR.scala:610)
at is.hail.expr.ir.lowering.LowerTableIR$.applyTable(LowerTableIR.scala:1062)
at is.hail.expr.ir.lowering.LowerTableIR$.lower$1(LowerTableIR.scala:728)
at is.hail.expr.ir.lowering.LowerTableIR$.apply(LowerTableIR.scala:1021)
at is.hail.expr.ir.lowering.LowerToCDA$.lower(LowerToCDA.scala:27)
at is.hail.expr.ir.lowering.LowerToCDA$.apply(LowerToCDA.scala:11)
at is.hail.expr.ir.lowering.LowerToDistributedArrayPass.transform(LoweringPass.scala:91)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:27)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:59)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:64)
at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:83)
at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:32)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:84)
at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:32)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:84)
at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:30)
at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:29)
at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:78)
at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:21)
at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:19)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:19)
at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:45)
at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:578)
at is.hail.backend.spark.SparkBackend.$anonfun$execute$4(SparkBackend.scala:614)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:84)
at is.hail.backend.spark.SparkBackend.$anonfun$execute$3(SparkBackend.scala:609)
at is.hail.backend.spark.SparkBackend.$anonfun$execute$3$adapted(SparkBackend.scala:608)
at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:78)
at is.hail.utils.package$.using(package.scala:673)
at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:78)
at is.hail.utils.package$.using(package.scala:673)
at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:13)
at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:65)
at is.hail.backend.spark.SparkBackend.$anonfun$withExecuteContext$2(SparkBackend.scala:411)
at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:55)
at is.hail.utils.ExecutionTimer$.logTime(ExecutionTimer.scala:62)
at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:397)
at is.hail.backend.spark.SparkBackend.execute(SparkBackend.scala:608)
at is.hail.backend.BackendHttpHandler.handle(BackendServer.scala:88)
at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:77)
at jdk.httpserver/sun.net.httpserver.AuthFilter.doFilter(AuthFilter.java:82)
at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:80)
at jdk.httpserver/sun.net.httpserver.ServerImpl$Exchange$LinkHandler.handle(ServerImpl.java:725)
at jdk.httpserver/com.sun.net.httpserver.Filter$Chain.doFilter(Filter.java:77)
at jdk.httpserver/sun.net.httpserver.ServerImpl$Exchange.run(ServerImpl.java:694)
at jdk.httpserver/sun.net.httpserver.ServerImpl$DefaultExecutor.execute(ServerImpl.java:159)
at jdk.httpserver/sun.net.httpserver.ServerImpl$Dispatcher.handle(ServerImpl.java:446)
at jdk.httpserver/sun.net.httpserver.ServerImpl$Dispatcher.run(ServerImpl.java:412)
at java.base/java.lang.Thread.run(Thread.java:829)

java.lang.ClassNotFoundException: is.hail.backend.spark.SparkBackend$$anon$5$RDDPartition
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)

Hail version: 0.2.132-678e1f52b999
Error summary: ClassNotFoundException: is.hail.backend.spark.SparkBackend$$anon$5$RDDPartition

Hi @nlbxuyen ,

This is an installation issue. What version of java and pyspark is your spark cluster using?

If memory serves, the latest version of hail uses spark 3.5.3 and java 11. Try upgrading your hail version or you may have to build hail from source.

Best,

Hi @ehigham , UKB Spark cluster uses pyspark version 3.5.2, hail version 0.2.132. They noticed that when choosing feature HAIL for Spark Cluster, Hail Java library is installed on all cluster nodes.

Hi @nlbxuyen,

What version of java are you using?

Thanks,

I used java version =11.0.13 for this.
Thank you