Fail write it in Hail format after loading a ~1Tb bgzipped VCF

We would like to load a ~1Tb bgzipped VCF and write it in Hail format, but we’re running into issues (fails with what seems to be memory errors). We tried on a subset (5k+ lines) and it works great.

We’re running on Amazon EMR, and as far as I can tell, only one instance is being used at any time.

Try 1 (1 master + 3 core instances, 32 Gb mem per instance, failed at Stage 1 after ~ 11 hrs)

import hl as hail

hl.import_vcf(“s3://path/file.vcf.gz”,force=True, reference_genome=‘GRCh38’).write(“s3://path/hailout/file.mt”,overwrite=True)

Try 2 (1 master + 1 core instance, 128 Gb mem per instance, 2Tb EBS, failed at Stage 2 after 6 hrs and 45Gb input, Stage 1 completed after 14 hrs)

import hail as hl

hl.import_vcf(“s3://path/file.vcf.gz”,force=True, reference_genome=‘GRCh38’).write(“s3://path/hailout/file.mt”,overwrite=True, stage_locally=True)

Error message from pyspark-submit output is essentially: Fatal Python error: Cannot recover from stack overflow.

  • is the read time expected (14 hrs)?

  • is the read/write expected to be parallel?

  • how much memory and space should we be allocating for this? any ballpark estimation methods?

  • is there anything else we should be looking out for in terms of cluster specs, method parameters etc to make it faster and have it complete? change min_partitions?

  • can/should we do a per-chromosome load? what would be the best way to go about this? import_vcf with a list of vcfs? or import_vcf and write each chromosome separately then do a merge(somehow?) ?

the doc on the force parameter in import_vcf:

  • force ( bool ) – If True , load .vcf.gz files serially. No downstream operations can be parallelized, so this mode is strongly discouraged.

I would expect this to take forever (but not necessarily to fail with OOM…).

You want the force_bgz argument!

We have a pull request to add a warning in this case, too.

Thank you!! Am working with OP on this - not long after posting we realised that we should be using force_bgz instead by combing through the docs and it works great now. I must’ve not read the HailException carefully enough (Probably just read the beginning where it said “treat as bgzipped” and the end of the paragraph where it said force=True, my bad).

Write still takes 4x as long, but I guess that is expected.

Thanks again.

Write still takes 4x as long

4x as long as what? Sorry, don’t know what you mean.

I’d also say that you’re using a pretty small cluster for a 1 TB VCF - I think the people around here would probably use ~500 cores at least for that task.

Hail scales pretty well so even though you pay more per hour, the task will finish faster, so the cost doesn’t go up much.

When viewing the Job on the AWS console the stages are split? where

JobID - 1
Description - collect at ContextRDD.scala:157
Details - org.apache.spark.rdd.RDD.collect(RDD.scala:944)
is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:157)
is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1065)
is.hail.rvd.RVD$.makeCoercer(RVD.scala:1129)
is.hail.io.vcf.MatrixVCFReader.coercer$lzycompute(LoadVCF.scala:1110)
is.hail.io.vcf.MatrixVCFReader.coercer(LoadVCF.scala:1110)
is.hail.io.vcf.MatrixVCFReader.apply(LoadVCF.scala:1139)
is.hail.expr.ir.MatrixRead.execute(MatrixIR.scala:363)…
Duration - 1.6h

And

JobID - 2
Description - collect at ContextRDD.scala:157
Details:org.apache.spark.rdd.RDD.collect(RDD.scala:944)
is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:157)
is.hail.io.RichContextRDDRegionValue$.writeRowsSplit$extension(RowStore.scala:1431)
is.hail.rvd.RVD.writeRowsSplit(RVD.scala:686)
is.hail.expr.ir.MatrixValue.write(MatrixValue.scala:177)…
Duration - 9.1h

This second Job/Stage is what I’m saying is longer than the first.

Totally agree the cluster is small - we were just troubleshooting this part so wanted to see how it went. If increasing cores will help we will absolutely do that.

Thanks!

Ah. This first stage is parsing the VCF to scan the variants and look at ordering – it doesn’t need to parse the genotypes. The second stage is parsing all the data, writing, compressing, etc.