Subsetting large data

I have a very large whole genome dataset with about 700 million SNPs. I am trying to subset this data to perform further analysis. I have applied several filters, for heterozygosity minor allele frequency, hardy-weinberg equilibrium, etc. Before applying the filters, I can perform very simple commands on this dataset very quickly, like count() or show(). After filtering, however, these simple commands can take hours. I have suspected that part of the reason this takes so long is because after subsetting, there are still over 100,000 partitions, and the process of loading them all into memory is what is so expensive. I found that when I repartition the data and save it as a new matrix table, I can load in that repartitioned dataset and perform subsequent commands on it very quickly. My issue is that the repartitioning itself is very expensive, it takes several hours even using many machines. I am new to working with data this size and working with Hail, so I am wondering if the Hail team has any advice for me. Is there some way to speed up subsetting and repartitioning a dataset? Also, because the count() command alone is so expensive, I don’t know how large the subsetted data are and therefore how many partitions would be ideal. Is there a faster way to count the number of SNPs after subsetting?

Here is some of my code:

import hail as hl
hl.init(default_reference="GRCh38")
mt_wgs_path = os.getenv("WGS_HAIL_STORAGE_PATH")
mt_wgs = hl.read_matrix_table(mt_wgs_path, _n_partitions = 20000)
# Heterozygosity rate
mt_wgs = mt_wgs.filter_rows(hl.is_missing(mt_wgs.filters))
# Minor allele frequency (MAF)
mt_wgs = hl.variant_qc(mt_wgs)
mt_wgs = mt_wgs.filter_rows(hl.min(mt_wgs.variant_qc.AF) > 0.01, keep = True)
# Hardy–Weinberg equilibrium (HWE)
mt_wgs = mt_wgs.filter_rows(mt_wgs.variant_qc.p_value_hwe > 1e-10, keep = True)
# Filter SNPs with more than 2 alleles
mt_wgs = mt_wgs.filter_rows(hl.len(mt_wgs.alleles) == 2)
# Filter SNPs with alleles longer than 1 character
mt_wgs = mt_wgs.filter_rows(hl.len(mt_wgs.alleles[0]) == 1)
mt_wgs = mt_wgs.filter_rows(hl.len(mt_wgs.alleles[1]) == 1)
# Call rate
mt_wgs = hl.sample_qc(mt_wgs, name='sample_qc')
# filter for samples that are > 99% call rate
mt_wgs = mt_wgs.filter_cols(mt_wgs.sample_qc.call_rate > 0.99)
# filter for SNPs that are > 99% call rate
mt_wgs = mt_wgs.filter_rows(mt_wgs.variant_qc.call_rate > 0.99)
# Re-partition
mt = mt_wgs.repartition(20000, shuffle = True)
# Save (This is the part that takes many hours)
mt_path = my_bucket + "/data/wgs_ss.mt"
mt.write(mt_path)

I think you should avoid using repartition – use _n_partitions on read as you’ve done at the top. repartition, especially with shuffle=True, is super slow. In general, you should think about repartitioning after you’ve already written data with too many partitions, not before.

The reason these are so fast is that they don’t really have to do much – MatrixTables store their number of rows and cols, so count() isn’t reading any data other than metadata when it appears right after a read. If count() appears after a variant_qc and filter_rows, then Hail needs to do a lot of work to compute that count!

Which would be better,
(1) setting _n_partitions on read, subsetting the data, then saving
(2) reading the data with default partitioning, subsetting the data, saving using the original # of partitions, and then repartitioning on read and saving the re-partitioned dataset
When I tried 1, I think the partitions were too large to read into memory, so my jobs were failing.

Hey @kforth !

I’m sorry to hear (1) didn’t work! Can you share a bit more detail on what happened or a stack trace? Based on the code you shared, I would not expect memory issues. For most operations, Hail fastidiously avoids reading the whole partition into memory and instead streams through the partition. In those cases, partitioning mostly controls the amount of parallelism available to Hail, not the memory requirements.

Based on the os.getenv, I wonder: are you using an on-prem cluster to do this analysis? In that case, you need to explicitly tell Java/Spark (a library on which Hail depends) how much memory to use. We have a post about how to do that: How do I increase the memory or RAM available to the JVM when I start Hail through Python?.