Running Hail on Terra -- how should I optimize?

I’m maintaining a Hail-related template notebook in the Broad’s Terra environment in a Jupyter notebook context on a Spark cluster as part of one the Biodata Catalyst grant. I am wondering what the best way to optimize for this kind of environment.

Previously, we told users to use the following configuration when running on Freeze 5b data, n=1000:

Master node: 8 CPUs, 30 GB mem, 500 GB disk space
Workers: 170 workers of which 50 are preemptible, each having 8 CPUs, 30 GB mem, and 100 GB disk space.

Now we’re running on Freeze 8 data, which has a truckload more variants. I am testing on a n=1111 cohort of genetically similar individuals, but likely due to the change to Freeze 8 the previous configuration appears inadequate. If I run only on chr 1 then I can complete the notebook with that compute in just under 50 minutes, but on all chromosomes, it appears to be taking too long or a preemptible is getting stuck. Avoiding preemptibles is an option, but it doesn’t fix the underlying issue – if at all possible we would like the notebook to be completable by users in a few hours tops for n=1000ish on Freeze 8 data, because compute will continue costing money after completion of all tasks in the notebook. Terra has a feature where it will “pause” the compute after about half an hour of no activity, but once you’re requesting this amount of resources, you are still spending about $20/hour when the compute is paused. In other words it can’t take so long that the user goes to bed and wakes up to a bunch of unnecessary extra charges.

The specific tasks being performed are related to a GWAS. The GWAS itself is performed outside of the notebook, this is the preparation. Essentially, the tasks are:

  • Loading in VCF data and creating a hail matrix (mt)
  • mt.count()
  • Merging the hail table with a pandas dataframe containing phenotypic data
  • Filtering for common variants (MAF > 0.01) via variant_qc()
  • hl.ld_prune(), which tends to take the longest
  • Perform a normalized PCA
  • Calculate a GRM

I am already in touch with several Terra engineers but I was recommended to take this question to the Hail community, as I have heard the setup of Terra is not entirely unlike common setups for Hail.

Hey @aofarrel !

Sorry you’re having performance headaches and high cost! I’m sure we can address this. Can you point us at the notebook? It’s easier for me to make suggestions on code.

Generally speaking, you don’t want to avoid preemptibles. You actually want as many preemptibles as possible for as long as possible! Hail power users often start a cluster, submit a Hail pipeline that does all the preemptible-safe computation and writes out the result, then re-configure the cluster for a subsequent pipeline that is not preemptible-safe. I wrote up some general notes here, but I’m happy to make specific comments on the notebook in question.

Looking quickly at the tasks, I’d structure this as:

  1. Import VCF write to MT. We’ll never touch the VCF again.
  2. Load MT, annotate with phenotypic data, calculate statistics, and write out just the rows of this table (much less data!). For example,
mt = mt.filter_rows(passes_qc = (mt.variant_qc.AF[0] > 0.2) & (mt.call_rate > 0.9) & ...)
rows = mt.rows()
rows =  # keep just the locus and alleles (the key fields)
rows.write('gs://.../qc_passing_variants')  # the write triggers all the QC calculations
  1. Now, for each of ld_prune, PCA, and GRM, I’d tune the cluster size, number of partitions, and preemptible to non-preemptible ratio to be successful for that operation. For each operation, I would freshly load and filter the matrix table. Once I have a set of ld_pruned variants, I’d write that out as another row table, to be used for filtering.

I’m pretty sure GRM is preemptible-safe if you set the hail temporary directory to a GCS directory (e.g. hl.init(tmp_dir='gs://my-temporary-bucket/')).

ld_prune is a notoriously difficult operation. I’ll ask someone with more background to comment here about that operation.

Yeah, babysitting pipelines is a pain. I’d ask gnomAD team how they deal with this. I’m sure they have good tactics. I personally run expensive jobs in a script where the cluster shutdown runs immediately after the submit finished—successful or not. Is there a way to replicate this behavior in Terra?

gcloud dataproc create ...
gcloud dataproc submit ...
gcloud dataproc stop ... 
1 Like

Thanks for your quick response. The notebook in question can be found here in herzog format. Herzog just allows us to represent the notebook in .py format instead of .ipynb – everything, including the markdown parts of the notebook, are in there. Anything outside of a with herzog.Cell block is not in the final rendered notebook. If you have access to Terra, you can see in in iPython format here in this template workspace. (The notebook says it is restricted access, but what I’m linking you is an empty template without data.)

Due to the nature of the notebook compute, the contents of a single notebook shares the same resources. Multiple notebooks can share resources, or these can create new computes, but there is no way to change a compute on the fly – if you decide you want more memory, you have to spin up an entirely new compute, clearing all variables in the process. So, if the GRM is in the same notebook as ld_prune (which is currently the case) then they share the same number of preemptibles, number of non-preempt workers, etc. I can split the notebooks if fine-tuning for each operation is really necessary, but since the process requires saving everything that was done so far, deleting a compute, creating a new one, spinning up the new one, re-installing hail and its dependencies in the new compute, and reloading everything necessary from the previous notebook, that also has a time loss – albiet one that tends to be in the range of about ten minutes, so if the time gain for carefully adjust the compute between these operations is large, I’m all for splitting the notebook.

As for tuning the cluster size and the ratio, that’s what I’m looking for specific advice on, especially on ld_prune. I’ve been testing several possibilities but so far have not seen any real gains on efficiency on this data, other than going from 120 non-preempt workers to 120 non-prempts+120 prempts, which reduced time on chr1 n=1111 by about just under five minutes but still taking more than 40 minutes total. I’m hesitant to keep running expensive tests with ever-increasing numbers of resources if there is knowledge out there as to what I should be aiming for.

As far as I know the cluster cannot be shut down and deleted from the notebook directly. The reason why the deletion is necessary (once the output has been saved to the separate google bucket that the rest of the workspace operates in) is that cost remains about 1/3 of what it does when active. ie, if an active compute costs about $60/hour, when paused it will cost roughly $20/hour. The user can delete the compute from the UI relatively easily, but it can’t be set to be deleted after the notebook runs, which is why I’m worried about users who say “this is taking forever, let me run it overnight” being on the hook for unnecessary costs.

I’ll just comment on the notebook in particular in this response. I’ve pinged the Hail-compiler-team to come comment on ld_prune.

  1. I’d avoid .show until you’ve written the matrix table. Hail doesn’t have smart VCF indexing strategies, so that show(5) is likely triggering more computation than anyone wants. Generally, we try to write immediately after importing vcf. Including the variant_qc and sample metadata in the MT itself is reasonable though, so I’d maybe just save the show for after the write.
  2. How many partitions does the written MT have? It looks like you switch to 25 partitions for the export and for ld prune.
  3. Oof, BlockMatrix.to_numpy is a really painful operation. It looks like you’re using that to then convert the GRM into a pandas table and output a comma separated table of text values. Most people who use the GRM in Hail do everything related to the GRM in Hail using BlockMatrix. At the very least, I’d recommend writing and reading the GRM BlockMatrix before calling to_numpy. I think the GRM is almost exclusively used as an input to Hail’s LMM module.

Hmm. Unfortunately, this notebook uses Hail in a way that doesn’t take advantage of its efficient data representations. Converting from one text format to another text format is kind of the worst-case scenario for Hail, we spend a lot of time converting to our internal binary formats only to leave them again. I think a more compelling example would use hl.linear_regression_rows as we do in our tutorial.

As I said, I’ll have someone comment on ld_prune. I’m curious if you see a performance improvement on the GRM by writing/reading before to_numpy!

1 Like

The rationale for .show() was more for so tutorial purposes. On the current test data set (aformentioned freeze 8, chr1 only, n=1111), each .show() only takes 7 seconds, so it’s not a concern to us compared to other operations. Nevertheless we can make it optional.

We set the minimum number of partitions to 200 by default. print(mt.n_partitions()) returns 200 after the import, so it seems to be going with the minimum. Is that expected behavior? Should we increase the minimum?

Thanks for your advice on the GRM – removing the numpy operation has saved several minutes. It went from nine minutes to just twenty seconds!

1 Like

Ah great!

I think hail aims for 128MB partitions by default as a good balance of memory per core and speed. If it’s going with the minimum, then that’s probably fine.

I’m pleasantly surprised to learn hail is so snappy on the VCF data!

Regarding ld_prune, the folks who know about that stuff are a bit busy at the moment so they can’t comment. Am I reading this code right that you repartition down to 25 partitions before calling ld_prune? I’d try running ld_prune on the data without any repartitioning. It’s possible there’s some bad interaction there.

We do indeed repartition to 25 as it seems to affect the writing of VCFs. Oddly enough, when I remove that repartitioning, ld_prune() seems to run much faster. However, it’s not something I can implement at the moment due to persistent errors.

In the context of ld_prune() there is a correlation between the number of partitions and the number of Spark workers (and block_size?). It’s putting together that relationship that I’m struggling with, which is making adjustments to partitioning to improve performance quite difficult.

This notebook configuration will fail on ld_prune():

  • 2 workers + 200 preemptibles

  • each worker gets 4 CPUs, 15 GB mem, and 50 GB disk

This notebook configuration will complete successfully:

  • 2 workers + 240 preemptibles

  • each worker gets 8 CPUs, 30 GB mem, and 50 GB disk

Unless I use that configration on more data. This whole time I’ve been testing on chr1, but if I expand to all chromosomes, LD_prune() fails.

I have a rough understanding of how ld_prune() operates within and outside given partitions in a three step process. But it’s not clear to me what the actual correlation between workers/cores, number of partitions, and block_size works. I saw this on ld_prune()’s documentation…

The parallelism of writing the locally-pruned matrix table as a block matrix is n_locally_pruned_variants / block_size .

…but I’m not sure how to make adjustments based on that. Should I be increasing block size? Adding more workers? Or am I barking up the wrong tree?

Here’s a stacktrace of the error:

Sorry for the high-latency responses. This is something we need to replicate and debug internally. To clarify:

You have WGS data, currently 1111 samples. This is roughly the pipeline failing:

  1. load vcf into MT
  2. annotate cols with some phenotype data
  3. run variant_qc
  4. filter rows to MAF > 0.01
  5. LD prune

I do expect LD prune to be somewhat sensitive to partitioning – we do a local in-partition prune, then write and do a matrix multiply and run a graph independence algorithm for the rest.

In many Hail pipelines, we spawn a Spark job with one task per partition of your [matrix]table. A Spark task runs on a worker (multiple tasks may run on a worker, the default configurations allow one task per core of a 4-core worker JVM).

However, the matrix operations do change partitioning here since we need to create a distributed block matrix with evenly-sized blocks. That’s the bit I’m most worried about, and most interested in replicating. I’m not sure what’s causing the crashes, though, and need to investigate further.

Also, one aside on your comment in the original post:

Using an autoscaling configuration in Google Dataproc should let your cluster scale up when there are jobs to run and scale down when there is idle time. This is a pretty effective way to use large clusters with high utilization (low $$$ waste).