Hello,
This analysis is within UKBiobank using the DNANexus environment.
hail version: 0.2.116-cd64e0876c94
My goal is to create a MatrixTable for the full genome for my subset of samples. I am trying to use the vds combiner to combine ~8000 gVCFs into a VDS which I then hope to densify into a MatrixTable for analysis. I am currently using 100 low priority AWS instances of mem3_ssd1_v2_x16
. I’ve tried upping the memory as shown in the config below but no luck. I can get the code below to succeed for a couple of samples. But as soon as I scale up, I run into issues. I’ve tried the full cohort (n=8000) and tried subsets of 1000 with no luck. Do I need to scale down even more? Any ideas or suggestions would be appreciated.
Relevant code:
import pandas as pd
import numpy as np
import os
import dxpy
import glob
import subprocess
import json
try:
from pyspark.sql import SparkSession
import hail as hl
builder = (
SparkSession
.builder
.config("spark.executor.cores", 8)
.config("spark.executor.memory", "100g")
.enableHiveSupport()
)
spark = builder.getOrCreate()
hl.init(sc=spark.sparkContext, default_reference='GRCh38')
except ModuleNotFoundError:
pass
db_name = "hail"
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
spark.sql(stmt)
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
db_name = "hail_temp"
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
spark.sql(stmt)
db_uri_temp = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
temp_url = f"dnax://{db_uri_temp}/"
vcf_chunks = np.array_split(files_wanted[0], round(files_wanted.shape[0]/1000))
for i, vcfs in enumerate(vcf_chunks):
vds_url = f"dnax://{db_uri}/samples_subset_{i}.vds"
vcfs = vcfs.to_list()
combiner = hl.vds.combiner.new_combiner(output_path=vds_url, gvcf_paths=vcfs, temp_path=temp_url, use_genome_default_intervals=True)
combiner.run()
2024-05-18 18:08:05.950 Hail: INFO: Running VDS combiner:
VDS arguments: 0 datasets with 0 samples
GVCF arguments: 1000 inputs/samples
Branch factor: 100
GVCF merge batch size: 50
2024-05-18 18:08:07.135 Hail: INFO: GVCF combine (job 1): merging 1000 GVCFs into 10 datasets
---------------------------------------------------------------------------
FatalError Traceback (most recent call last)
Cell In[10], line 1
----> 1 combiner.run()
File /opt/conda/lib/python3.9/site-packages/hail/vds/combiner/variant_dataset_combiner.py:336, in VariantDatasetCombiner.run(self)
334 while not self.finished:
335 self.save()
--> 336 self.step()
337 self.save()
338 info('Finished VDS combiner!')
File /opt/conda/lib/python3.9/site-packages/hail/vds/combiner/variant_dataset_combiner.py:393, in VariantDatasetCombiner.step(self)
391 return
392 if self._gvcfs:
--> 393 self._step_gvcfs()
394 else:
395 self._step_vdses()
File /opt/conda/lib/python3.9/site-packages/hail/vds/combiner/variant_dataset_combiner.py:492, in VariantDatasetCombiner._step_gvcfs(self)
488 merge_metadata = [VDSMetadata(path=temp_path + str(count).rjust(pad, '0') + '.vds',
489 n_samples=n_samples)
490 for count, n_samples in enumerate(merge_n_samples)]
491 paths = [md.path for md in merge_metadata]
--> 492 hl.vds.write_variant_datasets(merge_vds, paths, overwrite=True, codec_spec=FAST_CODEC_SPEC)
493 for md in merge_metadata:
494 self._vdses[max(1, floor(log(md.n_samples, self._branch_factor)))].append(md)
File /opt/conda/lib/python3.9/site-packages/hail/vds/methods.py:22, in write_variant_datasets(vdss, paths, overwrite, stage_locally, codec_spec)
19 ref_writer = ir.MatrixNativeMultiWriter([f"{p}/reference_data" for p in paths], overwrite, stage_locally,
20 codec_spec)
21 var_writer = ir.MatrixNativeMultiWriter([f"{p}/variant_data" for p in paths], overwrite, stage_locally, codec_spec)
---> 22 Env.backend().execute(ir.MatrixMultiWrite([vds.reference_data._mir for vds in vdss], ref_writer))
23 Env.backend().execute(ir.MatrixMultiWrite([vds.variant_data._mir for vds in vdss], var_writer))
File /opt/conda/lib/python3.9/site-packages/hail/backend/py4j_backend.py:82, in Py4JBackend.execute(self, ir, timed)
80 return (value, timings) if timed else value
81 except FatalError as e:
---> 82 raise e.maybe_user_error(ir) from None
File /opt/conda/lib/python3.9/site-packages/hail/backend/py4j_backend.py:76, in Py4JBackend.execute(self, ir, timed)
74 # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
75 try:
---> 76 result_tuple = self._jbackend.executeEncode(jir, stream_codec, timed)
77 (result, timings) = (result_tuple._1(), result_tuple._2())
78 value = ir.typ._from_encoding(result)
File /cluster/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /opt/conda/lib/python3.9/site-packages/hail/backend/py4j_backend.py:35, in handle_java_exception.<locals>.deco(*args, **kwargs)
33 tpl = Env.jutils().handleForPython(e.java_exception)
34 deepest, full, error_id = tpl._1(), tpl._2(), tpl._3()
---> 35 raise fatal_error_from_java_error_triplet(deepest, full, error_id) from None
36 except pyspark.sql.utils.CapturedException as e:
37 raise FatalError('%s\n\nJava stack trace:\n%s\n'
38 'Hail version: %s\n'
39 'Error summary: %s' % (e.desc, e.stackTrace, hail.__version__, e.desc)) from None
FatalError: ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition
Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 784 in stage 1.0 failed 4 times, most recent failure: Lost task 784.3 in stage 1.0 (TID 1827) (ip-10-60-7-97.eu-west-2.compute.internal executor 99): java.lang.ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1988)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2548)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2493)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2492)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2492)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1250)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1250)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1250)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2736)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2678)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2667)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:176)
at is.hail.rvd.RVD$.$anonfun$writeRowsSplitFiles$8(RVD.scala:1344)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
at is.hail.rvd.RVD$.writeRowsSplitFiles(RVD.scala:1346)
at is.hail.expr.ir.MatrixValue$.writeMultiple(MatrixValue.scala:285)
at is.hail.expr.ir.MatrixNativeMultiWriter.apply(MatrixWriter.scala:1533)
at is.hail.expr.ir.WrappedMatrixNativeMultiWriter.apply(TableWriter.scala:745)
at is.hail.expr.ir.Interpret$.run(Interpret.scala:863)
at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:59)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.evaluate$1(LowerOrInterpretNonCompilable.scala:20)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.rewrite$1(LowerOrInterpretNonCompilable.scala:67)
at is.hail.expr.ir.LowerOrInterpretNonCompilable$.apply(LowerOrInterpretNonCompilable.scala:72)
at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.transform(LoweringPass.scala:67)
at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$3(LoweringPass.scala:16)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
at is.hail.expr.ir.lowering.LoweringPass.$anonfun$apply$1(LoweringPass.scala:16)
at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:81)
at is.hail.expr.ir.lowering.LoweringPass.apply(LoweringPass.scala:14)
at is.hail.expr.ir.lowering.LoweringPass.apply$(LoweringPass.scala:13)
at is.hail.expr.ir.lowering.LowerOrInterpretNonCompilablePass$.apply(LoweringPass.scala:62)
at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1(LoweringPipeline.scala:22)
at is.hail.expr.ir.lowering.LoweringPipeline.$anonfun$apply$1$adapted(LoweringPipeline.scala:20)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:20)
at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:50)
at is.hail.backend.spark.SparkBackend._execute(SparkBackend.scala:463)
at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$2(SparkBackend.scala:499)
at is.hail.backend.ExecuteContext$.$anonfun$scoped$3(ExecuteContext.scala:75)
at is.hail.utils.package$.using(package.scala:635)
at is.hail.backend.ExecuteContext$.$anonfun$scoped$2(ExecuteContext.scala:75)
at is.hail.utils.package$.using(package.scala:635)
at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:17)
at is.hail.backend.ExecuteContext$.scoped(ExecuteContext.scala:63)
at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:351)
at is.hail.backend.spark.SparkBackend.$anonfun$executeEncode$1(SparkBackend.scala:496)
at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
at is.hail.backend.spark.SparkBackend.executeEncode(SparkBackend.scala:495)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
java.lang.ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1988)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1852)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2186)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:466)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Hail version: 0.2.116-cd64e0876c94
Error summary: ClassNotFoundException: is.hail.sparkextras.OriginUnionPartition
Log file: hail-20240518-1807-0.2.116-cd64e0876c94.log · GitHub
Thanks,
Andrew