I have a very large whole genome dataset with about 700 million SNPs. I am trying to subset this data to perform further analysis. I have applied several filters, for heterozygosity minor allele frequency, hardy-weinberg equilibrium, etc. Before applying the filters, I can perform very simple commands on this dataset very quickly, like count() or show(). After filtering, however, these simple commands can take hours. I have suspected that part of the reason this takes so long is because after subsetting, there are still over 100,000 partitions, and the process of loading them all into memory is what is so expensive. I found that when I repartition the data and save it as a new matrix table, I can load in that repartitioned dataset and perform subsequent commands on it very quickly. My issue is that the repartitioning itself is very expensive, it takes several hours even using many machines. I am new to working with data this size and working with Hail, so I am wondering if the Hail team has any advice for me. Is there some way to speed up subsetting and repartitioning a dataset? Also, because the count() command alone is so expensive, I don’t know how large the subsetted data are and therefore how many partitions would be ideal. Is there a faster way to count the number of SNPs after subsetting?
Here is some of my code:
import hail as hl
hl.init(default_reference="GRCh38")
mt_wgs_path = os.getenv("WGS_HAIL_STORAGE_PATH")
mt_wgs = hl.read_matrix_table(mt_wgs_path, _n_partitions = 20000)
# Heterozygosity rate
mt_wgs = mt_wgs.filter_rows(hl.is_missing(mt_wgs.filters))
# Minor allele frequency (MAF)
mt_wgs = hl.variant_qc(mt_wgs)
mt_wgs = mt_wgs.filter_rows(hl.min(mt_wgs.variant_qc.AF) > 0.01, keep = True)
# Hardy–Weinberg equilibrium (HWE)
mt_wgs = mt_wgs.filter_rows(mt_wgs.variant_qc.p_value_hwe > 1e-10, keep = True)
# Filter SNPs with more than 2 alleles
mt_wgs = mt_wgs.filter_rows(hl.len(mt_wgs.alleles) == 2)
# Filter SNPs with alleles longer than 1 character
mt_wgs = mt_wgs.filter_rows(hl.len(mt_wgs.alleles[0]) == 1)
mt_wgs = mt_wgs.filter_rows(hl.len(mt_wgs.alleles[1]) == 1)
# Call rate
mt_wgs = hl.sample_qc(mt_wgs, name='sample_qc')
# filter for samples that are > 99% call rate
mt_wgs = mt_wgs.filter_cols(mt_wgs.sample_qc.call_rate > 0.99)
# filter for SNPs that are > 99% call rate
mt_wgs = mt_wgs.filter_rows(mt_wgs.variant_qc.call_rate > 0.99)
# Re-partition
mt = mt_wgs.repartition(20000, shuffle = True)
# Save (This is the part that takes many hours)
mt_path = my_bucket + "/data/wgs_ss.mt"
mt.write(mt_path)