Best practices for UK Biobank Imputed Data

Hi,

I am using GCP to perform various genomics analyses on UKB imputed data, such as GWAS and PRS. For PRS this entails fetching dosages for thousands of SNPs from the imputed dataset for nearly the entire cohort.

Traditionally I would simply use the BGEN files on an HPC, but this is not feasible in the cloud as it would require localization of these large BGEN files to a VM, or paying for NFS-style storage which is quite costly.

Hail seems like the perfect fit. So we converted one of the smaller chromosomes and uploaded the MT to a Google Storage bucket and tested a query to filter on a subset of SNPs on that chromosome. As noted in previous threads I have read on this discussion board, the initial filter is fast, but the writing is slow. In our case the writing takes > 1 hour to complete, which is comparable to localizing the BGEN file to the VM, so apart from saving storage cost on the VM little cost savings is gained.

My questions are:

  1. Is such a usage case within the scope of what Hail is designed to do?
  2. Is there performance improvements we can make implement to fetch genetic variants for all individuals faster?

Thanks,

Vince

Hey @vforget!

Yes, QC, PRS, GWAS, etc. on. very large datasets are exactly the operations for which Hail was a designed.

Can you share the code you used? It sounds like you might be trying to copy all the genotypes into a new file. That is slow and expensive. Hail is designed to never modify or copy the genotype data. When you produce new variant or sample annotations, you should save them separately from the genotypes. In particular, if your data is already stored in a BGEN file, you should never use MatrixTable.write.

Hi @danking,

Sorry for the late reply. We are indeed converting BGEN to Matrix, and clearly this is the issue. How would one go about storing BGEN files on the cloud without the need to localize the data for analysis? For example, we want to extract a subset of SNPs from UKB for use with a PRS. To do so, I don’t want to localize the BGEN files from the bucket everytime I want to extract SNPs. Given this, we tried converting the BEN to Matrix and store the matrix on the cloud and query these Matrix via Hail. Here is the code to build the Matrix Tables:


#!/usr/bin/python3
# sys.argv[1] = chromosome
# sys.argv[2] = chromosome specfic bgen file
# sys.argv[3] = path to sample file
import hail, sys
bgen_file = "%s" % (sys.argv[2])
sample_file = "%s" % (sys.argv[3])
mt_file = "%s.mt" % (sys.argv[1])
hail.index_bgen(bgen_file, 
                contig_recoding={"01": "1", "02": "2", "03": "3", "04": "4", "05": "5", "06": "6", "07": "7", "08": "8", "09": "9"},

                reference_genome='GRCh37') 
d = hail.methods.import_bgen(bgen_file, 
                              entry_fields=['dosage'], 
                              sample_file=sample_file)
d.write(mt_file) 

We then copy the Matrix tables to a bucket.

We then query the matrix tables like so:

import hail as hl, pandas as pd
### function to extract snps 

def extract_snps(chrom):
  mt_file = "gs://..." + str(chrom) + ".mt"
  hl_mt = hl.read_matrix_table(mt_file)
  # get rsids for specific chromosome
  prs_data_chrom = prs_data.loc[prs_data['chr_name'] == chrom]
  rsids = prs_data_chrom.rsID.values
  set_to_keep = hl.literal(set(rsids))
  hl_mt_filt = hl_mt.filter_rows(set_to_keep.contains(hl_mt['rsid']))
  hl_mt_filt.count()
  return hl_mt_filt

### function to create prs

def create_prs(hl_mt, prs_data, chrom):
  prs_data_22 = prs_data.loc[prs_data['chr_name'] == chrom]
  prs_data_22_ea = prs_data_22[['effect_allele', 'rsID']]
  prs_data_22_ea.rename(columns={'rsID':'rsid'}, inplace=True)
  prs_data_mt = hl.Table.from_pandas(prs_data_22_ea, key = 'rsid')
  hl_mt = hl_mt.annotate_rows(**prs_data_mt[hl_mt.rsid])
  prs_data_22_weight = prs_data_22[['effect_weight', 'rsID']]
  prs_data_22_weight.rename(columns={'rsID':'rsid'}, inplace=True)
  prs_data_weight_mt = hl.Table.from_pandas(prs_data_22_weight, key = 'rsid')
  hl_mt = hl_mt.annotate_rows(**prs_data_weight_mt[hl_mt.rsid])
  flip = hl.case().when(hl_mt.effect_allele == hl_mt.alleles[0], True).when(hl_mt.effect_allele == hl_mt.alleles[1], False).or_missing()
  hl_mt = hl_mt.annotate_rows(flip = flip)
  hl_mt = hl_mt.filter_rows(hl.is_defined(hl_mt.flip))
  hl_mt = hl_mt.annotate_cols(prs=hl.agg.sum(hl_mt.effect_weight * hl.if_else(hl_mt.flip, 2 - hl_mt.dosage, hl_mt.dosage)))
  hl_mt.cols().export(("%s_mt_prs.txt.bgz") % (chrom))

### running extraction

hl.init(min_block_size=128)
prs = 'prs.txt'
prs_data = pd.read_csv(prs, sep = "\t", header = 0)
import timeit
chrom = 3
start = timeit.default_timer()
hl_mt = extract_snps(chrom)
hl_mt.entries().export(("%s_genotype_dosage.tsv.bgz") % (chrom))
hl_mt.write(("%s_biomarin_prs_snps.mt") % (chrom))
end = timeit.default_timer()
print(end-start)
start = timeit.default_timer()
create_prs(hl_mt, prs_data, chrom)
end = timeit.default_timer()
print(end-start)
del hl_mt

The above code can clearly be improved, but based on your feedback seems like we have yet to fully grok how to use Hail effectively.

So, how do you recommend we query UK Biobank BGEN files to avoid localization of entire chromosomes to a VM for each extraction?

Thanks in advance for your help. I am keen to learn more about Hail, so if you can also point me to additional documentation that may be helpful that would also be great.

Vince

So, I see that when you use hl.read_matrix_table(mt_file) you’re using a file path that points to a google storage bucket. That works for bgen files too. So you can directly call index_bgen("gs://path/to/bgen/file", ...) and then do the same with import_bgen. So just skip that first part about turning bgens into matrix tables and writing them to a bucket, and directly import the bgen file as:

hl_mt = hl.import_bgen(...)

Moving on to the rest of your code:

Dan rightly suggests you avoid doing hl_mt.write(("%s_biomarin_prs_snps.mt") % (chrom)). That’s going to make a full copy of the UKB data in a matrix table. You’d be better off just writing out whatever fields of the rows and columns you’re interested in the way you do when you did:

  hl_mt.cols().export(("%s_mt_prs.txt.bgz") % (chrom))

Also, I think that hl_mt.entries() is going to be a somewhat expensive operation on a huge MatrixTable like this. That operation lifts the entire dataset into a very very tall skinny table, which is an expensive transformation. You might consider doing hl_mt.entry.export("..."), which writes out the entry data along with the row keys. If you really need the data in that entry table form though, you can leave it how it is.

1 Like

Also open all the chromosomes together! Hail is designed to efficiently use a whole cluster and automatically shard data.

hl.import_bgen(['gs://path/to/chr1.bgen', 'gs://path/to/chr2.bgen', ...])

will work great!

Amazing! I’ll try this out ASAP and report back. Have a nice weekend :slight_smile:

Quick followup question:

Given the size of UKB, would this be more efficient to run on a Spark cluster or should I use a single VM with sufficient cores?

Clearly this is subject to cost, but I wanted to get a feel for sweet spot in performance v cost, ie when to use Spark cluster versus single VM.

Thanks,

Vince

Definitely going to want a Spark cluster for something this big. Should be cheaper than the large VMs you’d need, and more flexible with regard to scaling up or down depending on work you’re doing in a particular pipeline. I personally haven’t used it, but I know people have used dataproc’s autoscaling functionalities to scale clusters up and down during pipeline execution.

Thanks! I’ll dig into this as well.

I think I solved this, which was basically to run the SNP subsetting directly via import_bgen(). Using hl_mt.filter_rows() on the entire imported BGEN just made the export of the filtered Hail matrix very slow as I assume the code needs to visit the entire data set.

Here is code for those who may find it useful, which largely follows the Hail documentation for import_bgen. The PYSPARK_SUBMIT_ARGS is to allocate more memory as I was extracting a few hundred SNPs in this example. Not sure whether the hl.init(min_block_size=1024) is needed. Code runs in a few minutes on a 16-core VM using UKB 22.bgen file. The indexing took maybe 10-20 min.

Only downside is I can’t filter by rsid, but that’s not really a limitation.

Amazing!

# Set the following shell variable if "Java Heap Space out of memory" for hl.export_vcf()
# export PYSPARK_SUBMIT_ARGS='--driver-memory 16g --executor-memory 16g pyspark-shell'

import hail as hl
hl.init(min_block_size=1024)

# Do this only a single time.
# hl.index_bgen("gs://.../22.bgen", reference_genome='GRCh37')

variants = hl.import_table("variants.txt")
variants = variants.annotate(v=hl.parse_variant(variants.v)).key_by('v')
# variants.txt has contents such as:
# v
# 22:17495145:C:T
# 22:17564907:G:A
# ...

d = hl.methods.import_bgen("gs://.../22.bgen", 
                           entry_fields=['GP'], 
                           sample_file="gs://.../22.sample", 
                           variants=variants.v)

hl.export_vcf(d, "tmp.vcf")