I’m using Hail to filter and annotate variants with allele frequencies and REVEL scores in the All of Us computing environment (Terra). While exporting the Hail MatrixTable with the following code, its taking 33+ hours and still has not been able to create an output file. There seem to be multiple 3 hr jobs and I can’t figure out what part of my code is responsible for these jobs. I was wondering if there is something I could change to speed up the export process?
Thanks!
Code:
# get hail matrices for each
mt = hl.read_matrix_table(mt_wgs_path)
# load into hail matrix
sample_to_remove = hl.import_table(flagged_samples, key="sample_id")
# remove flagged samples
mt = mt.anti_join_cols(sample_to_remove)
# filter rows that do not pass quality control filters
mt = mt.filter_rows(hl.len(mt.filters)==0)
# filter for variants where call rate > 99%
mt = mt.filter_rows(mt.variant_qc.call_rate > 0.99, keep = True)
# load long QT patients as a hail matrix
longQT_indiv = wgs_longQT["person_id"]
longQT_indiv_df = longQT_indiv.to_frame()
longQT_indiv_df['person_id'] = longQT_indiv_df['person_id'].apply(str)
hl_LQT_indiv = hl.Table.from_pandas(longQT_indiv_df, key = "person_id")
# subset WGS matrix to only include patients with long QT
LQT_WGS_mt = mt.semi_join_cols(hl_LQT_indiv)
# import table as hail MatrixTable
ancestry_pred = hl.import_table(ancestry_path,
key="research_id",
impute=True,
types={"research_id":"tstr","pca_features":hl.tarray(hl.tfloat)})
# get ancestry of LQT patients
ancestry_pred_LQT = ancestry_pred.semi_join(hl_LQT_indiv)
# get only ancestry and person_id
ancestry_pred_LQT = ancestry_pred_LQT.drop('probabilities','pca_features','ancestry_pred_other')
ancestry_pred_LQT = ancestry_pred_LQT.rename({'research_id' : 'person_id'})
# remove flagged samples
ancestry_pred_LQT = ancestry_pred_LQT.anti_join(sample_to_remove)
# annotate WGS table with ancestry
LQT_WGS_mt = LQT_WGS_mt.annotate_cols(ancestry_pred = ancestry_pred_LQT[LQT_WGS_mt.s].ancestry_pred)
# read VAT
vat_table = hl.import_table(vat_path, force=True, quote='"', delimiter="\t", force_bgz=True, types={'position':hl.tint32
})
# filter VAT to only include canonical transcripts
vat_table = vat_table.filter(vat_table["is_canonical_transcript"] == "true")
# add locus as row feild
vat_table = vat_table.annotate(locus = hl.locus(
vat_table.contig, vat_table.position, reference_genome='GRCh38'), allele=hl.array([vat_table.ref_allele, vat_table.alt_allele]))
# add locus as key
vat_table = vat_table.key_by("locus", "allele")
vat_table = vat_table.semi_join(LQT_WGS_mt.rows())
# annotate revel scores, gene symbol, consequence, variant type, gene ids, omim phenotypes, clinvar
LQT_WGS_mt = LQT_WGS_mt.annotate_rows(revel = vat_table[LQT_WGS_mt.row_key].revel,
gene_symbol = vat_table[LQT_WGS_mt.row_key].gene_symbol,
consequence = vat_table[LQT_WGS_mt.row_key].consequence,
gvs_afr_af = vat_table[LQT_WGS_mt.row_key].gvs_afr_af,
gvs_amr_af = vat_table[LQT_WGS_mt.row_key].gvs_amr_af,
gvs_eas_af = vat_table[LQT_WGS_mt.row_key].gvs_eas_af,
gvs_eur_af = vat_table[LQT_WGS_mt.row_key].gvs_eur_af,
gvs_mid_af = vat_table[LQT_WGS_mt.row_key].gvs_mid_af,
gvs_sas_af = vat_table[LQT_WGS_mt.row_key].gvs_sas_af,
gvs_oth_af = vat_table[LQT_WGS_mt.row_key].gvs_oth_af,
)
# filter to only exclude genes from stop list
gene_stoplist = pd.read_csv("gene-stoplist.tsv", sep="\t")
gene_stoplist = gene_stoplist["Gene"].tolist()
LQT_WGS_mt = LQT_WGS_mt.filter_rows(hl.literal(gene_stoplist).contains(LQT_WGS_mt.gene_symbol), keep = False)
## create a table for each ancestry
# ancestry labels provided by All of Us
possible_ancestries = ["afr", "amr", "eas", "eur", "mid", "sas", "oth"]
# allele frequency feild for subpop
af_subpop_label = {a:"gvs_"+a+"_af" for a in possible_ancestries}
# create a dictionary of wgs tables with ancestry as key, and annotated wgs table filtered on ancestry as value
wgs_tables_by_ancestry = {a: LQT_WGS_mt.filter_cols(LQT_WGS_mt.ancestry_pred == a, keep=True) for a in possible_ancestries}
# annotate each population table with respective allele frequency
filtered_wgs_tables_by_ancestry = {"afr": wgs_tables_by_ancestry["afr"].annotate_rows(gvs_afr_af = wgs_tables_by_ancestry["afr"].gvs_afr_af),
"amr": wgs_tables_by_ancestry["amr"].annotate_rows(gvs_amr_af = wgs_tables_by_ancestry["amr"].gvs_amr_af),
"eas": wgs_tables_by_ancestry["eas"].annotate_rows(gvs_eas_af = wgs_tables_by_ancestry["eas"].gvs_eas_af),
"eur": wgs_tables_by_ancestry["eur"].annotate_rows(gvs_eur_af = wgs_tables_by_ancestry["eur"].gvs_eur_af),
"mid": wgs_tables_by_ancestry["mid"].annotate_rows(gvs_mid_af = wgs_tables_by_ancestry["mid"].gvs_mid_af),
"sas": wgs_tables_by_ancestry["sas"].annotate_rows(gvs_sas_af = wgs_tables_by_ancestry["sas"].gvs_sas_af),
"oth": wgs_tables_by_ancestry["oth"].annotate_rows(gvs_oth_af = wgs_tables_by_ancestry["oth"].gvs_oth_af),
}
# filter by allele frequency
filtered_wgs_tables_by_ancestry = {"afr": wgs_tables_by_ancestry["afr"].filter_rows(hl.float64(wgs_tables_by_ancestry["afr"].gvs_afr_af) <= 0.00357, keep = True),
"amr": wgs_tables_by_ancestry["amr"].filter_rows(hl.float64(wgs_tables_by_ancestry["amr"].gvs_amr_af) <= 0.00357, keep = True),
"eas": wgs_tables_by_ancestry["eas"].filter_rows(hl.float64(wgs_tables_by_ancestry["eas"].gvs_eas_af) <= 0.00357, keep = True),
"eur": wgs_tables_by_ancestry["eur"].filter_rows(hl.float64(wgs_tables_by_ancestry["eur"].gvs_eur_af) <= 0.00357, keep = True),
"mid": wgs_tables_by_ancestry["mid"].filter_rows(hl.float64(wgs_tables_by_ancestry["mid"].gvs_mid_af) <= 0.00357, keep = True),
"sas": wgs_tables_by_ancestry["sas"].filter_rows(hl.float64(wgs_tables_by_ancestry["sas"].gvs_sas_af) <= 0.00357, keep = True),
"oth": wgs_tables_by_ancestry["oth"].filter_rows(hl.float64(wgs_tables_by_ancestry["oth"].gvs_oth_af) <= 0.00357, keep = True),
}
# create a table for each consequence
for ancs in filtered_wgs_tables_by_ancestry.keys():
filtered_wgs_tables_by_ancestry[ancs] = {'missense': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('missense'), keep = True),
'frameshift_variant': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('frameshift_variant'), keep = True),
'stop_lost': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('stop_lost'), keep = True),
'start_lost': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('start_lost'), keep = True),
'stop_gained': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('stop_gained'), keep = True),
}
# filter missense variables by REVEL scores
for ancs in filtered_wgs_tables_by_ancestry.keys():
filtered_wgs_tables_by_ancestry[ancs]['missense'] = filtered_wgs_tables_by_ancestry[ancs]['missense'].filter_rows(hl.float64(filtered_wgs_tables_by_ancestry[ancs]['missense'].revel) > 0.644, keep = True)
# combine consequence tables for each ancestry
combined_consequences = dict()
for ancs in filtered_wgs_tables_by_ancestry.keys():
mt = filtered_wgs_tables_by_ancestry[ancs]['missense']
other_consequences = list(filtered_wgs_tables_by_ancestry[ancs].keys())[1:]
for c in other_consequences:
mt = mt.union_cols(filtered_wgs_tables_by_ancestry[ancs][c])
combined_consequences[ancs] = mt
# combine ancestry tables
mt = combined_consequences['afr']
other_matrices = list(combined_consequences.keys())[1:]
for ancs in other_matrices:
mt = mt.union_cols(combined_consequences[ancs])
# annotate phenotype information
mt = mt.annotate_rows(gene_id = vat_table[mt.row_key].gene_id,
omim_phenotypes_name = vat_table[mt.row_key].omim_phenotypes_name,
clinvar_classification = vat_table[mt.row_key].clinvar_classification,
clinvar_last_updated = vat_table[mt.row_key].clinvar_last_updated,
clinvar_phenotype = vat_table[mt.row_key].clinvar_phenotype)
# get entries
pv_table = mt.filter_entries(mt.GT.is_non_ref()).entries()
pv_table = pv_table.key_by(pv_table.locus, pv_table.alleles)
# get fields needed for candidate gene table
pv_table = pv_table.select(pv_table.s,
pv_table.gene_id,
pv_table.gene_symbol,
pv_table.consequence,
pv_table.omim_phenotypes_name,
pv_table.clinvar_classification,
pv_table.clinvar_phenotype,
pv_table.clinvar_last_updated,
pv_table.ancestry_pred
)
pv_table.write("pv_table.ht")
hl.read_table("pv_table.ht").export_vcf(pv_table, 'pv_table.vcf', parallel='header_per_shard')
Screenshot of Spark Console: