VDS combiner unsuccesful on large cohort

Hello,
This analysis is within UKBiobank using the DNANexus environment.
hail version: 0.2.116-cd64e0876c94

My goal is to create a MatrixTable for the full genome for my subset of samples. I am trying to use the vds combiner to combine ~8000 gVCFs into a VDS which I then hope to densify into a MatrixTable for analysis. I am currently using 100 low priority AWS instances of mem3_ssd1_v2_x16. I’ve tried upping the memory as shown in the config below but no luck. I can get the code below to succeed for a couple of samples. But as soon as I scale up, I run into issues. I’ve tried the full cohort (n=8000) and tried subsets of 1000 with no luck. Do I need to scale down even more? Any ideas or suggestions would be appreciated.

Relevant code:

import pandas as pd
import numpy as np
import os
import dxpy
import glob
import subprocess
import json

try:
    from pyspark.sql import SparkSession
    import hail as hl

    builder = (
        SparkSession
        .builder
        .config("spark.executor.cores", 8)
        .config("spark.executor.memory", "100g")
        .enableHiveSupport()
    )
    spark = builder.getOrCreate()
    hl.init(sc=spark.sparkContext, default_reference='GRCh38')
except ModuleNotFoundError:
    pass


db_name = "hail"
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
spark.sql(stmt)
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']

db_name = "hail_temp"
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
spark.sql(stmt)
db_uri_temp = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
temp_url = f"dnax://{db_uri_temp}/"

vcf_chunks = np.array_split(files_wanted[0], round(files_wanted.shape[0]/1000))
for i, vcfs in enumerate(vcf_chunks):
    vds_url = f"dnax://{db_uri}/samples_subset_{i}.vds"
    vcfs = vcfs.to_list()
    combiner = hl.vds.combiner.new_combiner(output_path=vds_url, gvcf_paths=vcfs, temp_path=temp_url, use_genome_default_intervals=True)
    combiner.run()

2024-05-18 18:08:05.950 Hail: INFO: Running VDS combiner:
    VDS arguments: 0 datasets with 0 samples
    GVCF arguments: 1000 inputs/samples
    Branch factor: 100
    GVCF merge batch size: 50
2024-05-18 18:08:07.135 Hail: INFO: GVCF combine (job 1): merging 1000 GVCFs into 10 datasets
---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
Cell In[10], line 1
----> 1 combiner.run()

File /opt/conda/lib/python3.9/site-packages/hail/vds/combiner/variant_dataset_combiner.py:336, in VariantDatasetCombiner.run(self)
    334 while not self.finished:
    335     self.save()
--> 336     self.step()
    337 self.save()
    338 info('Finished VDS combiner!')

File /opt/conda/lib/python3.9/site-packages/hail/vds/combiner/variant_dataset_combiner.py:393, in VariantDatasetCombiner.step(self)
    391     return
    392 if self._gvcfs:
--> 393     self._step_gvcfs()
    394 else:
    395     self._step_vdses()

File /opt/conda/lib/python3.9/site-packages/hail/vds/combiner/variant_dataset_combiner.py:492, in VariantDatasetCombiner._step_gvcfs(self)
    488 merge_metadata = [VDSMetadata(path=temp_path + str(count).rjust(pad, '0') + '.vds',
    489                               n_samples=n_samples)
    490                   for count, n_samples in enumerate(merge_n_samples)]
    491 paths = [md.path for md in merge_metadata]
--> 492 hl.vds.write_variant_datasets(merge_vds, paths, overwrite=True, codec_spec=FAST_CODEC_SPEC)
    493 for md in merge_metadata:
    494     self._vdses[max(1, floor(log(md.n_samples, self._branch_factor)))].append(md)

File /opt/conda/lib/python3.9/site-packages/hail/vds/methods.py:22, in write_variant_datasets(vdss, paths, overwrite, stage_locally, codec_spec)
     19 ref_writer = ir.MatrixNativeMultiWriter([f"{p}/reference_data" for p in paths], overwrite, stage_locally,
     20                                         codec_spec)
     21 var_writer = ir.MatrixNativeMultiWriter([f"{p}/variant_data" for p in paths], overwrite, stage_locally, codec_spec)
---> 22 Env.backend().execute(ir.MatrixMultiWrite([vds.reference_data._mir for vds in vdss], ref_writer))
     23 Env.backend().execute(ir.MatrixMultiWrite([vds.variant_data._mir for vds in vdss], var_writer))

File /opt/conda/lib/python3.9/site-packages/hail/backend/py4j_backend.py:82, in Py4JBackend.execute(self, ir, timed)
     80     return (value, timings) if timed else value
     81 except FatalError as e:
---> 82     raise e.maybe_user_error(ir) from None

File /opt/conda/lib/python3.9/site-packages/hail/backend/py4j_backend.py:76, in Py4JBackend.execute(self, ir, timed)
     74 # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     75 try:
---> 76     result_tuple = self._jbackend.executeEncode(jir, stream_codec, timed)
     77     (result, timings) = (result_tuple._1(), result_tuple._2())
     78     value = ir.typ._from_encoding(result)

File /cluster/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/conda/lib/python3.9/site-packages/hail/backend/py4j_backend.py:35, in handle_java_exception.<locals>.deco(*args, **kwargs)
     33     tpl = Env.jutils().handleForPython(e.java_exception)
     34     deepest, full, error_id = tpl._1(), tpl._2(), tpl._3()
---> 35     raise fatal_error_from_java_error_triplet(deepest, full, error_id) from None
     36 except pyspark.sql.utils.CapturedException as e:
     37     raise FatalError('%s\n\nJava stack trace:\n%s\n'
     38                      'Hail version: %s\n'
     39                      'Error summary: %s' % (e.desc, e.stackTrace, hail.__version__, e.desc)) from None

FatalError: ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 784 in stage 1.0 failed 4 times, most recent failure: Lost task 784.3 in stage 1.0 (TID 1827) (ip-10-60-7-97.eu-west-2.compute.internal executor 99): java.lang.ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1988)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2548)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2493)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2492)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2492)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1250)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1250)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1250)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2736)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2678)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2667)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:176)
	at is.hail.rvd.RVD$.$anonfun$writeRowsSplitFiles$8(RVD.scala:1344)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.rvd.RVD$.writeRowsSplitFiles(RVD.scala:1346)
	at is.hail.expr.ir.MatrixValue$.writeMultiple(MatrixValue.scala:285)
	at is.hail.expr.ir.MatrixNativeMultiWriter.apply(MatrixWriter.scala:1533)
	at is.hail.expr.ir.WrappedMatrixNativeMultiWriter.apply(TableWriter.scala:745)
	at is.hail.expr.ir.Interpret$.run(Interpret.scala:863)
	at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:59)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:20)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:67)
	at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:67)
	at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
	at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:14)
	at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:62)
	at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:22)
	at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:20)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:20)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:50)
	at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:463)
	at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$2(SparkBackend.scala:499)
	at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:75)
	at is.hail.utils.package$.using(package.scala:635)
	at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:75)
	at is.hail.utils.package$.using(package.scala:635)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
	at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:63)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:351)
	at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$1(SparkBackend.scala:496)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:495)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)

java.lang.ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1988)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
	at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
	at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)




Hail version: 0.2.116-cd64e0876c94
Error summary: ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition

Log file: hail-20240518-1807-0.2.116-cd64e0876c94.log · GitHub

Thanks,
Andrew

I managed to get it past this step by setting import_interval_size=40_000 and use_genome_default_intervals=False. However, this was incredibly slow and was likely going to take 100+ hours using 100 instances of mem2_ssd1_v2_x16 .

Hi @anh151, apologies for the late reply and I’m sorry you’re having trouble with hail. This error looks like DNANexus is improperly installing hail

FatalError: ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition

Unfortunately, the hail team has no visibility into how DNANexus configures tools on their platform. I wish I could help with this particular problem but I think your best bet is to contact DNANexus saying that their hail installation is broken. They can contact us to sort out how to properly install hail but ultimately they will need to make the fix.

1 Like