Parallel Hail Tasks

A general question, but my understanding is that functions (while the operation is parallelized) are done serially. Is there a way to invoke 2 hail functions that are read only in parallel?

E.g. can we write a mt to disk and export it to elasticsearch at the same time?

Nope, unfortunately not. However, you can probably speed things up by writing this workflow as:

  1. write as native Table / MatrixTable
  2. read that object
  3. export to elasticsearch

Thanks Tim. I was trying to interleave both IO operations so they can be done in parallel, since both can take a long time and are not computation bound. Your suggestion is still a serial process right? Wouldn’t that take longer than what we currently have, which is calling write and then export on the same mt?

Yes, still serial.

But that will be much faster than:

mt = hl.variant_qc(mt)

mt.write(...)
hl.export_elasticsearch(mt...)

Hail builds a computational graph and executes it lazily, which means that in the above example, the variant_qc is getting run twice (once for write, once for export.

If you have expensive upstream operations, it will almost always be faster to write/read/export.

Thanks Tim, that makes sense, very helpful. I’ll double check if that’s what’s going on.

Hi guys, but if you do

vds = hc.import_vcf("test.vcf.gz")
vds = vds.vep("vep.config")
vds.write("test.vds")
kt = vds.variants_table()
kt.export_elasticsearch("localhost", 9200, "test", "test", 100, config={})

this only runs VEP once, right?

nope, twice.

errr wait yeah that’s right

I think VEP is special and only gets run once, since its results are persisted.

great. I see the persist here

Is there a way to tell how many times spark executed a given step?

oops, this is actually totally wrong. See [bugfix] Fix persisting of vep, logistic regression, poisson regression by tpoterba · Pull Request #5416 · hail-is/hail · GitHub for a fix.

Hmm, this isn’t super easy. We can’t really do that just in Spark, but could add some tooling on our side.

Awesome, glad a fix came out of this. Do you guys know if this non-persisting was in v01? If it wasn’t persisting, then we were re-computing that multiple times?

this wasn’t an issue in 0.1. It was probably only introduced ~6 weeks ago

Thanks Tim.

Ps. I found these Spark UI visualizations

but it looks somewhat complicated to get access to them on dataproc.

Sorry to open up an old conversation: is VEP no longer persisting on version 0.2.32? I am possibly confused on the semantics here.

The call to persist appears to be gone in the latest VEP.scala, and looking at my executor stderr logs VEP is definitely being run twice between writing to disk and writing to Elasticsearch, despite manually persisting the MatrixTable in between.

It’s not a big deal to rewrite the pipeline, just not sure what’s expected. It does seem that the behavior has changed.

Maybe it’s not working as intended, but the idea was to move the persist from Scala to python: https://github.com/hail-is/hail/pull/5416

Great, thanks for the clarification on the expected behavior.

It does seem that the MatrixTable.persist method does not actually persist VEP; I have to do MatrixTable.write and read from the written table in order to use VEP-annotated variants downstream without rerunning VEP.

I’ll look into this. Just to clarify, you have code like:

vepped = hl.methods.vep(mt)
vepped = vepped.persist() # Important to save the persisted thing to a variable
vepped.write(....)
vepped.export_elasticsearch(...) #I forget what the elasticsearch method is called

Before:

vepped = hl.methods.vep(mt)
vepped = vepped.persist()
annotated = vepped.index_rows(some stuff)
annotated = annotated.persist() # VEP runs again during this step - again, I thought it wasn't supposed to
annoated.write(path)
final = read_matrix_table(path)
hl.export_elasticsearch(annoated)

My fix to get around VEP recomputing:

vepped = hl.methods.vep(mt)
vepped = vepped.persist()
vepped.write('tmpmt.mt')
vepped = read_matrix_table('tmpmt.mt')
annotated = vepped.index_rows(some stuff)
annotated = annotated.persist() 
annoated.write(path)
final = read_matrix_table(path)
hl.export_elasticsearch(annoated)

Just to assuage my own confusion: I thought the expected behavior was that VEP would not re-run after persisting.