Performance after MatrixTable filtering (repartition question)


#1

Dear Hail Team,

First, thanks for this awesome software! I’m using hail for UKB analyses and am having performance issues after sample/variant filtering of a MatrixTable.

code to read/subset ukb bgens

hl_imaging = hl_df.filter(hl.is_nan(hl_df.age_at_scan) == False)
ht_variants = hl.read_table(’/gpfs/milgram/data/UKB/ukb_snp/tmp_variants.ht’)

mt = hl.import_bgen(path=bgen_files,
entry_fields=[‘GT’],
sample_file=sample_files[0],
index_file_map=index_files)

mt = mt.filter_cols(hl.is_defined(hl_imaging[mt.s]))
mt = mt.filter_rows(hl.is_defined(ht_variants[mt.locus, mt.alleles]))

…at this point, if I execute any method like mt.count_rows() or mt.write(), hail will execute using the original number of partitions for the bgen data (~18k). e.g.,

In [24]: mt.count_rows()
2018-12-18 18:04:09 Hail: INFO: Coerced sorted dataset
2018-12-18 18:04:19 Hail: INFO: Coerced sorted dataset
2018-12-18 18:04:24 Hail: INFO: Coerced sorted dataset
[Stage 38:> (0 + 20) / 18257]

I’m only on a 20 core machine, but the execution seems very slow. If I understand correctly, running:

mt = mt.repartition(500, shuffle=False)

should handle this by reducing the number of tasks being sent to each core and decrease execution time? The catch is that running .repartition() doesn’t seem to change the number of tasks being executed (i.e. it remains at 18,257). Is there another method that should be called after MatrixTable subsetting?

I’m new to the Hail environment, so this is probably a dumb mistake on my part. But any help would be hugely appreciated!

Thank you,
Kevin


#2

Hi Kevin,
Sorry things are running slowly! The UKBB BGEN is a tremendous amount of data, and processing it on 20 cores is going to take a while.

There are probably steps we can take to speed things up, though.

First, about repartition: There are three types of repartitioning schemes in Hail:

  1. repartition(n, shuffle=True): This mode is best used after massive filters, or to increase partitioning. Data shuffles are slow, but this doesn’t require a two-pass algorithm to rebalance partitions.
  2. repartition(n, shuffle=False) This mode is useful if you want to reduce partitioning when you don’t know how balanced the partitions are. It is a two-pass algorithm, requiring a first pass to estimate which partitions to combine. For instance, if I start with 1000 partitions, and filter out the latter half of my data (resulting in 500 empty partitions), then if I repartition down to 500 partitions, I should drop out all later partitions instead of grouping [0, 1], [2, 3], etc.
  3. naive_coalesce(n) This mode of repartitioning does the totally naive thing of grouping together adjacent partitions, and is one pass.

I don’t think you necessarily need to repartition here, though. If you want fewer partitions, you can specify that on import_bgen with the n_partitions parameter. But you should certainly be able to process 20,000.

What is the full pipeline you’re running? It’s possible that something else here is very slow, and we can help identify that.


#3

Hi Tim,

Thanks for your response!

I will be using hail for genetic analyses of novel neuroimaging phenotypes. To start though, I’m creating a GWAS pipeline that I’ll validate with height/BMI phenotypes. I’m referencing the online hail 0.2 tutorial and the Hail 0.1 repo here (https://github.com/Nealelab/UK_Biobank_GWAS). This first bit of code is meant to subset the UKB data to around 25k imaging subjects and UKB-defined 90k variants used to produce a GRM.

It takes about 1hr+ on 20 cores to execute “hl.export_gen” or “.count_rows()” for a MatrixTable of size (ncol=25,330, nrow=93,511). I’m still

Here is a bit more info. Would love your input on whether there is a mistake in my code, or if things are running about as expected given my machine specs.

possibly relevant env variable from .bashrc script:

export PYSPARK_SUBMIT_ARGS="–driver-memory 50g --executor-memory 50g pyspark-shell"

Make file lists:

# bgen/index/sample file lists
bgen_files = [os.path.join(imputed_dir, ‘ukb_imp_chr{:}_v3.bgen’).format(i) for i in np.arange(1,23)]
index_files = {os.path.join(imputed_dir, ‘ukb_imp_chr{:}_v3.bgen’).format(i): os.path.join(imputed_dir, ‘ukb_imp_chr{:}_v3.bgen.idx2’).format(i) for i in np.arange(1,23)}
sample_files = [os.path.join(imputed_dir, ‘ukb25163_imp_chr{:}_v3_s487324.sample’).format(i) for i in np.arange(1,23)]

subset UKB phenotype data to imaging subjects

#nrow=25,330, ncol=11
hl_imaging = hl_df.filter(hl.is_nan(hl_df.age_at_scan) == False)
38%20PM

read UKB variant QC data

ht_variants = hl.import_table(’/gpfs/milgram/data/UKB/ukb_snp/ukb_snp_qc.txt’, delimiter=’ ')
# annotate locus, alleles fields in variant table
ht_variants = ht_variants.annotate(locus=hl.locus(ht_variants[‘chromosome’].replace(‘23’, ‘X’).replace(‘24’, ‘Y’).replace(‘25’, ‘X’).replace(‘26’, ‘MT’),
hl.int(ht_variants[‘position’]),
‘GRCh37’),
alleles=hl.array([ht_variants[‘allele1_ref’], ht_variants[‘allele2_alt’]]))

Key variant table and subset to variants used in GRM creation

ht_variants = ht_variants.key_by(ht_variants[‘locus’], ht_variants[‘alleles’])
# filter to variants used in UKBB relatedness inference
ht_variants = ht_variants.filter(ht_variants[‘in_Relatedness’] == ‘1’)
#ht_variants now has 93,511 rows

# import matrix table with imaging samples and all autosomal variants
mt = hl.import_bgen(path=bgen_files,
entry_fields=[‘GT’],
sample_file=sample_files[0],
index_file_map=index_files,
n_partitions=1000)

# subset matrix table columns to samples in imaging data
mt = mt.filter_cols(hl.is_defined(hl_imaging[mt.s]))

# subset each matrix table to only variants used in relatedness inference
mt = mt.filter_rows(hl.is_defined(ht_variants[mt.locus, mt.alleles]))

hl.export_gen(mt, ‘/gpfs/milgram/data/UKB/ukb_snp/test’)


#4

(sorry, Tim here, we are in the middle of setting up HTTPS for this forum and seem to have broken Github and Google auth temporarily)

It takes about 1hr+ on 20 cores to execute “hl.export_gen” or “.count_rows()” for a MatrixTable of size (ncol=25,330, nrow=93,511)

It’s important to note that even though these might be the final dimensions of the MatrixTable, Hail has to do a LOT more work - it has to start by importing and decoding the entire initial BGEN file, and do work to subset variants and samples.

If this is starting from the 90M by 500K BGEN, this will take a while! The files may be ~2T on disk, but they are extremely well compressed and decompress about 50x. 100 terabytes is a ton of data!

100 Terabytes with 20 cores is 5 terabytes per core hour, or around 1.5G per core second. It’s hard to do much better than this!

However, in your case, you’re doing a huge variant filter. We have a way to do this much more efficiently – there’s a “variants” parameter on import_bgen, which expects a variant-keyed table and does a semi-join (filters to those variants). So try that:

mt = hl.import_bgen(path=bgen_files,
  entry_fields=[‘GT’],
  sample_file=sample_files[0],
  index_file_map=index_files,
  n_partitions=1000,
  variants=ht_variants)

It should be massively faster. Let us know what happens!


Unrelated style point:

hl.locus(ht_variants[‘chromosome’].replace(‘23’, ‘X’).replace(‘24’, ‘Y’).replace(‘25’, ‘X’).replace(‘26’, ‘MT’)

can be

translation_dict = {'23': 'X', '24': 'Y', '25': 'X', '26': 'MT'}
hl.locus(hl.literal(translation_dict),get(ht_variants['chromosome'], default=ht_variants['chromosome'])

#5

Amazing! That did the trick. Thanks Tim for your help!


#6

How long did it take with variants=ht_variants?


#7

After subsetting, it took about 10-12 minutes to run mt.count_rows() and about the same for hl.export_gen().

I’m hoping to increase the pool of cpus if I can get Spark to play nice with the Slurm queuing system on our cluster: https://researchcomputing.princeton.edu/faq/spark-via-slurm


#8

OK, that’s a bit more reasonable.

It’ll be a challenge to use Spark on Slurm - I know people have done it but it’s never been easy.