Importing many sample-specific VCFs

Hi all, hope you guys are well!

I had a simple question. I have a set ~900 sample-specific VCFs that I would like to import and combine in Hail.

I tried on a small list of two vcfs (if this command is correct) and receiving the below error

vcf_list = [“gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUCF538BRM/CASE-NEUCF538BRM-00610/CASE-NEUCF538BRM-00610-G_1.recalibrated.haplotypeCalls.vcf.gz”,
“gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUAE228FF6/CASE-NEUAE228FF6-02221/CASE-NEUAE228FF6-02221-G_1.recalibrated.haplotypeCalls.vcf.gz”]

FatalError Traceback (most recent call last)
in
1 mts = hl.import_vcf(vcf_list,
----> 2 force_bgz = True, reference_genome=‘GRCh38’)

in import_vcf(path, force, force_bgz, header_file, min_partitions, drop_samples, call_fields, reference_genome, contig_recoding, array_elements_required, skip_invalid_loci, entry_float_type, filter, find_replace, n_partitions, block_size, _partitions)

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
612 def wrapper(original_func, *args, **kwargs):
613 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
→ 614 return original_func(*args, **kwargs)
615
616 return wrapper

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/methods/impex.py in import_vcf(path, force, force_bgz, header_file, min_partitions, drop_samples, call_fields, reference_genome, contig_recoding, array_elements_required, skip_invalid_loci, entry_float_type, filter, find_replace, n_partitions, block_size, _partitions)
2246 skip_invalid_loci, force_bgz, force, filter, find_replace,
2247 _partitions)
→ 2248 return MatrixTable(ir.MatrixRead(reader, drop_cols=drop_samples))
2249
2250

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/matrixtable.py in init(self, mir)
556 self._entry_indices = Indices(self, {self._row_axis, self._col_axis})
557
→ 558 self._type = self._mir.typ
559
560 self._global_type = self._type.global_type

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/ir/base_ir.py in typ(self)
359 def typ(self):
360 if self._type is None:
→ 361 self._compute_type()
362 assert self._type is not None, self
363 return self._type

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/ir/matrix_ir.py in _compute_type(self)
59
60 def _compute_type(self):
—> 61 self._type = Env.backend().matrix_type(self)
62
63

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/spark_backend.py in matrix_type(self, mir)
340
341 def matrix_type(self, mir):
→ 342 jir = self._to_java_matrix_ir(mir)
343 return tmatrix._from_java(jir.typ())
344

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/spark_backend.py in _to_java_matrix_ir(self, ir)
296
297 def _to_java_matrix_ir(self, ir):
→ 298 return self._to_java_ir(ir, self._parse_matrix_ir)
299
300 def _to_java_blockmatrix_ir(self, ir):

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/spark_backend.py in _to_java_ir(self, ir, parse)
286 r = CSERenderer(stop_at_jir=True)
287 # FIXME parse should be static
→ 288 ir._jir = parse(r(ir), ir_map=r.jirs)
289 return ir._jir
290

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/spark_backend.py in _parse_matrix_ir(self, code, ref_map, ir_map)
264
265 def _parse_matrix_ir(self, code, ref_map={}, ir_map={}):
→ 266 return self._jbackend.parse_matrix_ir(code, ref_map, ir_map)
267
268 def _parse_blockmatrix_ir(self, code, ref_map={}, ir_map={}):

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
→ 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/spark_backend.py in deco(*args, **kwargs)
49 raise FatalError(’%s\n\nJava stack trace:\n%s\n’
50 ‘Hail version: %s\n’
—> 51 ‘Error summary: %s’ % (deepest, full, hail.version, deepest), error_id) from None
52 except pyspark.sql.utils.CapturedException as e:
53 raise FatalError(’%s\n\nJava stack trace:\n%s\n’

FatalError: HailException: invalid sample IDs: expected sample ids to be identical for all inputs. Found different sample IDs at position 0.
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUCF538BRM/CASE-NEUCF538BRM-00610/CASE-NEUCF538BRM-00610-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-00610-b38
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUAE228FF6/CASE-NEUAE228FF6-02221/CASE-NEUAE228FF6-02221-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-02221

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 20 times, most recent failure: Lost task 0.19 in stage 6.0 (TID 190, berylc-w-1.us-central1-b.c.human-genetics-001.internal, executor 3): is.hail.utils.HailException: invalid sample IDs: expected sample ids to be identical for all inputs. Found different sample IDs at position 0.
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUCF538BRM/CASE-NEUCF538BRM-00610/CASE-NEUCF538BRM-00610-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-00610-b38
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUAE228FF6/CASE-NEUAE228FF6-02221/CASE-NEUAE228FF6-02221-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-02221
at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:11)
at is.hail.utils.package$.fatal(package.scala:77)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8$$anonfun$apply$9.apply(LoadVCF.scala:1592)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8$$anonfun$apply$9.apply(LoadVCF.scala:1590)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8.apply(LoadVCF.scala:1590)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8.apply(LoadVCF.scala:1574)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:970)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:970)
at is.hail.io.vcf.MatrixVCFReader$.apply(LoadVCF.scala:1574)
at is.hail.io.vcf.MatrixVCFReader$.fromJValue(LoadVCF.scala:1652)
at is.hail.expr.ir.MatrixReader$.fromJson(MatrixIR.scala:91)
at is.hail.expr.ir.IRParser$.matrix_ir_1(Parser.scala:1693)
at is.hail.expr.ir.IRParser$$anonfun$matrix_ir$1.apply(Parser.scala:1619)
at is.hail.expr.ir.IRParser$$anonfun$matrix_ir$1.apply(Parser.scala:1619)
at is.hail.utils.StackSafe$More.advance(StackSafe.scala:64)
at is.hail.utils.StackSafe$.run(StackSafe.scala:16)
at is.hail.utils.StackSafe$StackFrame.run(StackSafe.scala:32)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$1.apply(Parser.scala:1959)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$1.apply(Parser.scala:1959)
at is.hail.expr.ir.IRParser$.parse(Parser.scala:1946)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1959)
at is.hail.backend.spark.SparkBackend$$anonfun$parse_matrix_ir$1.apply(SparkBackend.scala:566)
at is.hail.backend.spark.SparkBackend$$anonfun$parse_matrix_ir$1.apply(SparkBackend.scala:565)
at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:20)
at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:18)
at is.hail.utils.package$.using(package.scala:618)
at is.hail.annotations.Region$.scoped(Region.scala:18)
at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:18)
at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:240)
at is.hail.backend.spark.SparkBackend.parse_matrix_ir(SparkBackend.scala:565)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

is.hail.utils.HailException: invalid sample IDs: expected sample ids to be identical for all inputs. Found different sample IDs at position 0.
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUCF538BRM/CASE-NEUCF538BRM-00610/CASE-NEUCF538BRM-00610-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-00610-b38
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUAE228FF6/CASE-NEUAE228FF6-02221/CASE-NEUAE228FF6-02221-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-02221
at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:11)
at is.hail.utils.package$.fatal(package.scala:77)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8$$anonfun$apply$9.apply(LoadVCF.scala:1592)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8$$anonfun$apply$9.apply(LoadVCF.scala:1590)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8.apply(LoadVCF.scala:1590)
at is.hail.io.vcf.MatrixVCFReader$$anonfun$apply$8.apply(LoadVCF.scala:1574)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:972)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Hail version: 0.2.58-3f304aae6ce2
Error summary: HailException: invalid sample IDs: expected sample ids to be identical for all inputs. Found different sample IDs at position 0.
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUCF538BRM/CASE-NEUCF538BRM-00610/CASE-NEUCF538BRM-00610-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-00610-b38
gs://maze-data-external-private/answer-als/genomics/3_vcf/CASE-NEUAE228FF6/CASE-NEUAE228FF6-02221/CASE-NEUAE228FF6-02221-G_1.recalibrated.haplotypeCalls.vcf.gz: CGND-HDA-02221

Are these GVCFs (with reference blocks)? If so, you should use the vcf combiner. If not, are they from genotype arrays (same variants for every VCF)?

If the answer to both is no, we can still merge, but it’s going to be lossy and you’ll end up with some missing data where a given sample didn’t have a variant recorded in the VCF.

These are all per-sample annotated VCFs from whole genome sequencing data. There are about 900 of them. I figured Hail was the quickest way to combine, but let me know if I’m mistaken!

I also realize this might be a duplicate of Merge vcfs like bcftools merge - #5 by njbernstein but I couldn’t quite see where in Konrad’s code he was referring to

So they have non-overlapping variants, yes?

Also, what is the handling of INFO fields you want here?

Yes, they have non-overlapping variants, that will be singletons and which we definitely want to preserve.

In terms of info field, my main interest is combining into a single VCF to be able to do variant QC, so I’d like to maintain the GQ, DP - but I think that’s calculated from the entries right?

Here is the description of one sample specific VCF

Global fields:
** None**
----------------------------------------
Column fields:
** ‘s’: str**
----------------------------------------
Row fields:
** ‘locus’: locus**
** ‘alleles’: array**
** ‘rsid’: str**
** ‘qual’: float64**
** ‘filters’: set**
** ‘info’: struct {**
** AC: array, **
** AF: array, **
** AN: int32, **
** BaseQRankSum: float64, **
** ClippingRankSum: float64, **
** DP: int32, **
** DS: bool, **
** END: int32, **
** ExcessHet: float64, **
** FS: float64, **
** HaplotypeScore: float64, **
** InbreedingCoeff: float64, **
** MLEAC: array, **
** MLEAF: array, **
** MQ: float64, **
** MQ0: int32, **
** MQRankSum: float64, **
** NEGATIVE_TRAIN_SITE: bool, **
** POSITIVE_TRAIN_SITE: bool, **
** QD: float64, **
** RAW_MQ: float64, **
** ReadPosRankSum: float64, **
** SOR: float64, **
** VQSLOD: float64, **
** VariantType: str, **
** culprit: str**
** }**
----------------------------------------
Entry fields:
** ‘AB’: float64**
** ‘AD’: array**
** ‘DP’: int32**
** ‘GQ’: int32**
** ‘GT’: call**
** ‘MIN_DP’: int32**
** ‘MQ0’: int32**
** ‘PGT’: call**
** ‘PID’: str**
** ‘PL’: array**
** ‘RGQ’: int32**
** ‘SB’: array**
----------------------------------------
Column key: [‘s’]
Row key: [‘locus’, ‘alleles’]
----------------------------------------

Also just to be extra clear, this is what the VCF itself looks like for example.

Here’s something that may work:

def multi_way_union_mts(mts: list, tmp_dir: str, chunk_size: int) -> hl.MatrixTable:
    """Joins MatrixTables in the provided list
    :param list mts: list of MatrixTables to join together
    :param str tmp_dir: path to temporary directory for intermediate results
    :param int chunk_size: number of MatrixTables to join per chunk
    :return: joined MatrixTable
    :rtype: MatrixTable
    """
    staging = [mt.localize_entries("__entries", "__cols") for mt in mts]
    stage = 0
    while len(staging) > 1:
        n_jobs = int(math.ceil(len(staging) / chunk_size))
        info(f"multi_way_union_mts: stage {stage}: {n_jobs} total jobs")
        next_stage = []
        for i in range(n_jobs):
            to_merge = staging[chunk_size * i : chunk_size * (i + 1)]
            info(
                f"multi_way_union_mts: stage {stage} / job {i}: merging {len(to_merge)} inputs"
            )
            merged = hl.Table.multi_way_zip_join(to_merge, "__entries", "__cols")
            merged = merged.annotate(
                __entries=hl.flatten(
                    hl.range(hl.len(merged.__entries)).map(
                        lambda i: hl.coalesce(
                            merged.__entries[i].__entries,
                            hl.range(hl.len(merged.__cols[i].__cols)).map(
                                lambda j: hl.null(
                                    merged.__entries.__entries.dtype.element_type.element_type
                                )
                            ),
                        )
                    )
                )
            )
            merged = merged.annotate_globals(
                __cols=hl.flatten(merged.__cols.map(lambda x: x.__cols))
            )
            print(merged.aggregate((hl.agg.stats(hl.len(merged.__entries)), hl.len(merged.__cols))))
            next_stage.append(
                merged.checkpoint(
                    os.path.join(tmp_dir, f"stage_{stage}_job_{i}.ht"), overwrite=True
                )
            )
        info(f"done stage {stage}")
        stage += 1
        staging.clear()
        staging.extend(next_stage)
    return (
        staging[0]
        ._unlocalize_entries("__entries", "__cols", list(mts[0].col_key))
        .unfilter_entries()
    )

You can use this with:

paths = [...]
mt_result = multi_way_union_mts(
    [hl.import_vcf(path, ...) for path in paths], 
    tmp_dir='some bucket if you are running on cloud', 
    chunk_size = 64 # should work well
)

great thanks Tim! I tried this on the small list of two VCFs and it works. Will try on the larger list now. appreciate it!

things like variantqc/sampleqc won’t work out of the box, I think, because you need to fill in missing homrefs. Something like:

mt = mt.annotate_entries(present_in_original = True)
mt = mt.unfilter_entries() # densify the missing entries
mt = mt.annotate_entries(GT = hl.coalesce(mt.GT, hl.call(0, 0))) # fill in homrefs
mt = mt.annotate_entries(present_in_original = hl.coalesce(mt.present_in_original, False))