I have a question about scaling when using Hail in a Jupyter notebook using Terra. Are there ways to improve the write time for saving a matrix? I don’t believe the write function benefits from a spark cluster. But, for example, with export_vcf you can save as shards. I am currently trying to create a tutorial that uses Hail and saving a matrix with 1000 individuals takes 50 minutes, and we expect users to scale this analysis to 10,000+ individuals. The notebook imports vcf, converts to hail, filters through various steps, and then writes the hail matrix. So, I am not sure if a batch job is best for these interactive filtering steps.
The notebook imports vcf
What line of code are you using for this?
geno = (
hl
.import_vcf(
vcf_paths, force_bgz=True
)
)
OK, great. Can you share the whole script, possibly?
import hail as hl
import bokeh.io
from bokeh.io import *
from bokeh.resources import INLINE
bokeh.io.output_notebook(INLINE)
hl.init(sc, default_reference = “GRCh38”, log = ‘population-genetics.log’)
geno = (
hl
.import_vcf(
vcf_paths, force_bgz=True
)
)
samples_traits_for_analysis= (
hl
.Table
.from_pandas(
samples_traits_for_analysis,
key = ‘nwd_id’
)
)
geno = (
geno.annotate_cols(pheno = samples_traits_for_analysis[geno.s])
)
geno = (
geno
.filter_cols(hl.is_defined(geno.pheno[‘blood_pressure_test_systolic_blood_pressure’]), keep=True)
)
geno = hl.variant_qc(geno)
geno = (
geno
.filter_rows(
geno.variant_qc.AF[1] > 0.01
)
)
start_matrix_write_time = time.time()
geno.write(bucket + ‘common-variants.mt’, overwrite = True)
elapsed_write_time= time.time()- start_matrix_write_time
print(timedelta(seconds=elapsed_write_time))
#0:50:13.987171
OK, thanks.
This should all be pretty fast –
How many variants/samples are there across the VCFs? How big is your cluster?
Also, do you see a message like “ordering unsorted dataset with network shuffle”?
1052 samples, 6.5 million variants
Notebook setup:
Attributes | Value |
---|---|
Profile | Custom |
Spark Cluster | Yes |
Master-CPUs | 4 |
Master-Disk size | 100 |
Master-Memory | 15 GB |
Workers | 4 |
Workers-CPUs | 4 |
Workers-Disk size | 50 |
Workers-Memory | 15 GB |
Yes, I get that message after running:
samples_traits_for_analysis= (
hl
.Table
.from_pandas(
samples_traits_for_analysis,
key = ‘nwd_id’
)
)
OK, this still seems quite slow.
One potential issue is that I think Terra is running an extremely old version of Hail, from before the 0.2 release. We’ve dramatically improved performance in the last year.
Initializing Spark and Hail with default parameters…
Running on Apache Spark version 2.2.3
SparkUI available at http://10.128.0.69:4040
Welcome to
__ __ <>__
/ // /__ __/ /
/ __ / _ `/ / /
// //_,/// version 0.2.11-daed180b84d8
Is it possible to load whatever samples_traits_for_analysis
is directly from a TSV file into Hail? It’s possible from_pandas is the cause of your woes.
I’m also curious of the performance of this pipeline on a modern Hail cluster (0.2.11 is 8 months old). Is it possible to run this same script on a Google Dataproc Spark cluster created using hailctl
(it’s installed by default when you pip install hail
on your laptop)? If the main issue is the old version of Hail, we’ll need to encourage Terra to update to Hail 0.2.25.
In any case, 50 minutes is entirely unacceptable performance for this pipeline, we will help you address this.
I tried loading samples_traits_for_analysis directly as a TSV and it did not improve the write time. I think it will be a little complicated for me to test this on the Google Dataproc Spark cluster because I am using controlled access data, but I can check on this. I can also reach out to the Terra team about updating their Hail version.
If you can give us the schemas / size of each of the inputs here, we can probably simulate some data with the same shape to benchmark
samples_traits_for_analysis is 7 columns x 1052 rows where individuals are rows. Three columns are strings and 4 are integers. One column is the key to match an individual’s traits with its genotypes in the VCF.
The VCF is 11.81 GB and contains 1152 individuals and 471 million variants. It is very large because the variants were jointly called across many studies to enable comparisons of multiple studies and then individual studies were subsetted. This input VCF represents only individuals from one study, and the final Hail matrix is only variants found in this set of individuals (6.5 million).
12G seems very small for 471M variants. What is the schema of the VCF?
Do you have a suggested tool I could use in a python notebook to look at this? What specifically would you like, the VCF header?
Header, or mt.describe()
mt.describe() shows this:
Global fields:
None
Column fields:
‘s’: str
Row fields:
‘locus’: locus
‘alleles’: array
‘rsid’: str
‘qual’: float64
‘filters’: set
‘info’: struct {}
Entry fields:
‘GT’: call
Column key: [‘s’]
Row key: [‘locus’, ‘alleles’]
Can you clarify the size of the VCF? Do you have a 470 million variant VCF that is 11GB or do you have a 6 million variant VCF that is 11 GB? Which VCF are you importing into Hail?
When I first open the VCF in Hail as a matrix, I use summarize_variants() and it lists 471 million variants total. This represents all variants jointly called across all individuals in this larger database (100,000+ individual genomes). I then filter this matrix for maf<0.01 for this set of 1,100 individuals. This filters the dataset to 6.5 million variants for this set, which removes rare variants and some loci that are not variable in this set of individuals. Does that make sense?
We’ve been discussing this internally a bit. We’re mostly in agreement that your file is somewhat of a zip bomb – while it might only be 11GB, it decompresses >100x to over 2TB. It is compressed so well due to two reasons: first, that you only have the hard calls (GT), which compress very well on their own, and second, that you have ~450M rows of that VCF where the genotypes are exactly the same (all hom-ref). This will compress extremely well!
We might be able to get a few fold improvement here (I’d try switching to the newest version of Hail, for instance), but your best bet for making this much faster is increasing the size of the cluster you’re running on.