Hi @danking, thanks for your quick reply!
The slow single stage here occurs when I run the code block listed above, but I’ll certainly try writing out the matrix table first before the slow filtering rows stage.
The full code is as follows (in blocks as follows, with descriptions for how each runs):
# Initialise hail and spark
import pyspark.sql
config = pyspark.SparkConf().setAll([('spark.kryoserializer.buffer.max', '128')])
sc = pyspark.SparkContext(conf=config)
from pyspark.sql import SparkSession
import hail as hl
builder = (
SparkSession
.builder
.enableHiveSupport()
)
spark = builder.getOrCreate()
hl.init(sc=sc)
import dxpy
# Define the chromosome you're working with and variables used in the import
chr = 21
file_url = f"file:///mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, pVCF format - final release/*_c{chr}_*.vcf.gz"
mt=hl.import_vcf(file_url,
force_bgz=True,
reference_genome='GRCh38',
array_elements_required=False).write(f"./chr_{chr}_initial_mt.mt", overwrite=True)
This stage runs in parallel and appears to be fairly efficient. It scales as I scale up the number of nodes.
mt=hl.read_matrix_table(f"./chr_{chr}_initial_mt.mt")
print(f"Num partitions: {mt.n_partitions()}")
# Then check if it looks as you'd expect
mt.describe()
# First check initial variant counts
print(f'n original variants in chromosome {chr}: {mt.count_rows()}')
# Remove sites labelled as 'monoallelic'
mt=mt.filter_rows(hl.is_missing(mt.filters))
print(f'n variants in chromosome {chr} following removal of those labelled as monoallelic: {mt.count_rows()}')
# Remove sites with > 6 alleles
mt = mt.filter_rows(mt.alleles.length() <= 6)
print(f'n variants in chromosome {chr} following removal of those with more than 6 alleles: {mt.count_rows()}')
# Split multi-allelic sites
mt = hl.split_multi_hts(mt)
print(f'n variants in chromosome {chr} following splitting of multiallelic sites: {mt.count_rows()}')
print('Early variant QC complete')
These stages all seem to run smoothly and in parallel.
#Genotype QC
# Filtering on allele balance
## Set the allele balance variable
# Create allele balance variable
ab=mt.AD[1]/hl.sum(mt.AD)
# Below removing: genotypes listed as homozygous reference but with >10 alternate allele reads | genotypes listed as homozgyous without a reference:alternate of ~1:1 | and genotypes listed as homozygous alternate with over 10% reference reads
filter_condition_ab=((mt.GT.is_hom_ref() & (ab <= 0.1)) | (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75)) | (mt.GT.is_hom_var() & (ab >= 0.9)))
mt=mt.filter_entries(filter_condition_ab)
print('Filtering on allele balance completed')
# Remove entries with depth < 10
mt=mt.filter_entries(mt.DP > 10)
print('Filtering on depth completed')
# Remove entries with a GQ < 30
mt=mt.filter_entries(mt.GQ > 30)
print('Filtering on genotype quality completed')
# Remove variants with no alleles left following genotype QC
mt = hl.variant_qc(mt)
mt = mt.filter_rows((mt.variant_qc.AF[0] == 0.0) | (mt.variant_qc.AF[0] == 1.0), keep = False)
print(f'n variants in chr {chr} following genotype QC and removal of non-variant rows: {mt.count_rows()}')
print('Genotype QC complete')
It is at the end of this block (after ‘Filtering on allele balance completed’,‘Filtering on depth completed’,and ‘Filtering on genotype quality completed’ have been written out) that a single task appears to be being run. The filtering stages all appear to run quickly, but then the final stage, where non-variant rows are removed takes hours in a single stage. Given your suggestion, I could write out the matrix table following the filtering of genotypes, and then read it back in to remove non-variant rows. Would you recommend doing this before or after calculating variant QC metrics - i.e. is the calculation of these metrics the single stage?
Following this, later stages seem to run smoothly:
# Drop all entries except the genotype
mt=mt.select_entries(mt.GT)
mt.describe()
# Save out this data table using DNAX
# Define database and MT names
# Note: It is recommended to only use lowercase letters for the database name.
# If uppercase lettering is used, the database name will be lowercased when creating the database.
db_name = f"ukbb_test_wes_matrix_tables_feb23_test"
mt_name = f"chromosome_{chr}_post_genoqc_final.mt"
# Create database in DNAX
stmt = f"CREATE DATABASE IF NOT EXISTS {db_name} LOCATION 'dnax://'"
print(stmt)
spark.sql(stmt).show()
# Store MT in DNAX
import dxpy
# Find database ID of newly created database using dxpy method
db_uri = dxpy.find_one_data_object(name=f"{db_name}", classname="database")['id']
url = f"dnax://{db_uri}/{mt_name}"
# Note: the dnax url must follow this format to properly save MT to DNAX
# Before this step, the Hail MatrixTable is just an object in memory. To persist it and be able to access
# it later, the notebook needs to write it into a persistent filesystem (in this case DNAX).
mt.write(url)
Thanks for the other suggestions! From what I know, the RAP doesn’t enable autoscaling, although I’ll look into this to make sure it’s definitely true, as it seems like autoscaling would be really helpful for this work.
I hadn’t realised hail was designed to tolerate spot instances, so thanks for that! That could make this processing much cheaper which would be great!
Thanks again for all your help with this!