Most efficiently using cloud resources

Hi there,

I’m currently processing pVCF files for 500k exomes using Hail on the UKBB RAP. I’m trying to make this as efficient as possible given the costs of these analyses, and have a question about tasks that appear to use a single node/worker. I’m monitoring my jobs using the Spark UI, and most tasks (importing vcfs, writing the matrix table out, and doing some early variant level QC) run what appears to be efficiently using all workers I have available (running in parallel across partitions). Some tasks however seem to appear as a single task, for example, I run genotype QC steps as follows, and then remove variants with no alleles left after genotype QC:

# Remove entries with depth < 10
mt=mt.filter_entries(mt.DP > 10)
print('Filtering on depth completed')
# Remove entries with a GQ < 30
mt=mt.filter_entries(mt.GQ > 30)
print('Filtering on genotype quality completed')

# Remove variants with no alleles left following genotype QC
mt = hl.variant_qc(mt)
mt = mt.filter_rows((mt.variant_qc.AF[0] == 0.0) | (mt.variant_qc.AF[0] == 1.0), keep = False)
print(f'n variants in chr {chr} following genotype QC and removal of non-variant rows: {mt.count_rows()}')
print('Genotype QC complete')

The final stage of this process (calculating variant QC metrics and removing variants with no remaining alleles) appears to not run across partitions, but rather just using one stage (which takes a few hours). Is this because to calculate variant QC metrics, all variants are required to be considered at once? I worry that only one worker is being used on tasks like this, and therefore the other nodes (which I’m still paying for) are being unused for the time this stage takes (often a few hours). Is this the case, and if so, is there any way to parallelise this process?

Thanks for your help!

Hi @eilidh !

The code you shared here lacks an “action” like write or export, so it’s hard to comment on it. Could you share the full code?

What happens next, maybe mt.write or hl.export_vcf?

Often a single stage indicates that you’ve requested a single output file, for example, a single VCF or a single TSV. In those cases, you can preserve parallelism for the expensive portion of your pipeline like this:

# Do all the expensive work in parallel
mt.write(...)
# Read that saved work
mt = hl.read_matrix_table(...)
# Do the necessarily non-parallel single file creation
hl.export_vcf(mt, ...)

My other big suggestion here is to use auto-scaling clusters. When you’re working with big datasets like 500k exomes, auto-scaling clusters are critical for efficiently using cloud resources because you’re not charged for resources you’re not using. We provide some autoscaling configurations for Google Dataproc here and Google provides some documentation as well. I’m not familiar with the RAP, but it looks like Amazon EMR does support autoscaling.

One final suggestion, if you’re not already using “spot” instances you should use them! Hail is pervasively designed to tolerate spot VMs and they’re usually much cheaper than non-spot VMs!

Hi @danking, thanks for your quick reply!

The slow single stage here occurs when I run the code block listed above, but I’ll certainly try writing out the matrix table first before the slow filtering rows stage.
The full code is as follows (in blocks as follows, with descriptions for how each runs):

# Initialise hail and spark 
import pyspark.sql
config = pyspark.SparkConf().setAll([('spark.kryoserializer.buffer.max', '128')])
sc = pyspark.SparkContext(conf=config) 
from pyspark.sql import SparkSession
import hail as hl
builder = (
    SparkSession
    .builder
    .enableHiveSupport()
)
spark = builder.getOrCreate()
hl.init(sc=sc)
import dxpy
# Define the chromosome you're working with and variables used in the import 
chr = 21
file_url = f"file:///mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, pVCF format - final release/*_c{chr}_*.vcf.gz"
mt=hl.import_vcf(file_url, 
                 force_bgz=True,
                 reference_genome='GRCh38',
                 array_elements_required=False).write(f"./chr_{chr}_initial_mt.mt", overwrite=True)

This stage runs in parallel and appears to be fairly efficient. It scales as I scale up the number of nodes.

mt=hl.read_matrix_table(f"./chr_{chr}_initial_mt.mt")
print(f"Num partitions: {mt.n_partitions()}")
# Then check if it looks as you'd expect
mt.describe()
# First check initial variant counts
print(f'n original variants in chromosome {chr}: {mt.count_rows()}')
# Remove sites labelled as 'monoallelic'
mt=mt.filter_rows(hl.is_missing(mt.filters))
print(f'n variants in chromosome {chr} following removal of those labelled as monoallelic: {mt.count_rows()}')
# Remove sites with > 6 alleles 
mt = mt.filter_rows(mt.alleles.length() <= 6)
print(f'n variants in chromosome {chr} following removal of those with more than 6 alleles: {mt.count_rows()}')
# Split multi-allelic sites 
mt = hl.split_multi_hts(mt)
print(f'n variants in chromosome {chr} following splitting of multiallelic sites: {mt.count_rows()}')
print('Early variant QC complete')

These stages all seem to run smoothly and in parallel.

#Genotype QC
# Filtering on allele balance
## Set the allele balance variable 
# Create allele balance variable
ab=mt.AD[1]/hl.sum(mt.AD)
# Below removing: genotypes listed as homozygous reference but with >10 alternate allele reads | genotypes listed as homozgyous without a reference:alternate of ~1:1 | and genotypes listed as homozygous alternate with over 10% reference reads
filter_condition_ab=((mt.GT.is_hom_ref() & (ab <= 0.1)) | (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75)) | (mt.GT.is_hom_var() & (ab >= 0.9)))
mt=mt.filter_entries(filter_condition_ab)
print('Filtering on allele balance completed')
# Remove entries with depth < 10
mt=mt.filter_entries(mt.DP > 10)
print('Filtering on depth completed')
# Remove entries with a GQ < 30
mt=mt.filter_entries(mt.GQ > 30)
print('Filtering on genotype quality completed')
# Remove variants with no alleles left following genotype QC
mt = hl.variant_qc(mt)
mt = mt.filter_rows((mt.variant_qc.AF[0] == 0.0) | (mt.variant_qc.AF[0] == 1.0), keep = False)
print(f'n variants in chr {chr} following genotype QC and removal of non-variant rows: {mt.count_rows()}')
print('Genotype QC complete')

It is at the end of this block (after ‘Filtering on allele balance completed’,‘Filtering on depth completed’,and ‘Filtering on genotype quality completed’ have been written out) that a single task appears to be being run. The filtering stages all appear to run quickly, but then the final stage, where non-variant rows are removed takes hours in a single stage. Given your suggestion, I could write out the matrix table following the filtering of genotypes, and then read it back in to remove non-variant rows. Would you recommend doing this before or after calculating variant QC metrics - i.e. is the calculation of these metrics the single stage?

Following this, later stages seem to run smoothly:

# Drop all entries except the genotype
mt=mt.select_entries(mt.GT)
mt.describe()
# Save out this data table using DNAX 
# Define database and MT names
# Note: It is recommended to only use lowercase letters for the database name.
# If uppercase lettering is used, the database name will be lowercased when creating the database.
db_name = f"ukbb_test_wes_matrix_tables_feb23_test"
mt_name = f"chromosome_{chr}_post_genoqc_final.mt"
# Create database in DNAX
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
print(stmt)
spark.sql(stmt).show()
# Store MT in DNAX
import dxpy
# Find database ID of newly created database using dxpy method
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
url = f"dnax://{db_uri}/{mt_name}" 
# Note: the dnax url must follow this format to properly save MT to DNAX
# Before this step, the Hail MatrixTable is just an object in memory. To persist it and be able to access 
# it later, the notebook needs to write it into a persistent filesystem (in this case DNAX).
mt.write(url) 

Thanks for the other suggestions! From what I know, the RAP doesn’t enable autoscaling, although I’ll look into this to make sure it’s definitely true, as it seems like autoscaling would be really helpful for this work.

I hadn’t realised hail was designed to tolerate spot instances, so thanks for that! That could make this processing much cheaper which would be great!

Thanks again for all your help with this!

Thanks so much for the tip on saving out the mt before next stages, as this has removed the slow single stage!

Writing out the matrix table before this stage enabled all stages to run in parallel and eliminated the single stage, really speeding up the total process. For the interest of others that may have a similar query, the script I now use for that section is as follows:

# Filtering on allele balance
## Set the allele balance variable 
# Create allele balance variable
ab=mt.AD[1]/hl.sum(mt.AD)
# Below removing: genotypes listed as homozygous reference but with >10 alternate allele reads | genotypes listed as homozgyous without a reference:alternate of ~1:1 | and genotypes listed as homozygous alternate with over 10% reference reads
filter_condition_ab=((mt.GT.is_hom_ref() & (ab <= 0.1)) | (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75)) | (mt.GT.is_hom_var() & (ab >= 0.9)))
mt=mt.filter_entries(filter_condition_ab)
print('Filtering on allele balance completed')
# Remove entries with depth < 10
mt=mt.filter_entries(mt.DP > 10)
print('Filtering on depth completed')
# Remove entries with a GQ < 30
mt=mt.filter_entries(mt.GQ > 30)
print('Filtering on genotype quality completed')

This section runs immediately.

Then saving out a matrix table after calculating variant QC metrics runs in parallel using all workers available.

mt = hl.variant_qc(mt)
# The next stage (filtering out non-variant sites) runs as a very slow single stage if the mt is not saved out first
# So best to save out at this point in parallel, enabling later stages to run in parallel too.
mt.write(f"./chr_{chr}_postgenoQC_mt.mt", overwrite=True)

And then reading this mt back in and removing non-variant rows runs in parallel and quickly.

mt=hl.read_matrix_table(f"./chr_{chr}_postgenoQC_mt.mt")
# Remove variants with no alleles left following genotype QC 
# When run before being saved as an mt first, this was a v slow single stage, but now runs quickly in parallel. 
mt = mt.filter_rows((mt.variant_qc.AF[0] == 0.0) | (mt.variant_qc.AF[0] == 1.0), keep = False)
print(f'n variants in chr {chr} following genotype QC and removal of non-variant rows: {mt.count_rows()}')
print('Genotype QC complete')

This stage (which was previously a single stage) now runs in parallel.

Overall, writing out the mt and reading it back in is much more efficient (uses all workers) and runs more quickly than running this without saving out.