Export VCF taking a long time, even when running in parallel

I’m using Hail to filter and annotate variants with allele frequencies and REVEL scores in the All of Us computing environment (Terra). While exporting the Hail MatrixTable with the following code, its taking 33+ hours and still has not been able to create an output file. There seem to be multiple 3 hr jobs and I can’t figure out what part of my code is responsible for these jobs. I was wondering if there is something I could change to speed up the export process?

Thanks!
Code:

# get hail matrices for each 
mt = hl.read_matrix_table(mt_wgs_path)


# load into hail matrix
sample_to_remove = hl.import_table(flagged_samples, key="sample_id")
# remove flagged samples
mt = mt.anti_join_cols(sample_to_remove)

# filter rows that do not pass quality control filters
mt = mt.filter_rows(hl.len(mt.filters)==0)

# filter for variants where call rate > 99% 
mt = mt.filter_rows(mt.variant_qc.call_rate > 0.99, keep = True)

# load long QT patients as a hail matrix
longQT_indiv = wgs_longQT["person_id"]
longQT_indiv_df = longQT_indiv.to_frame()
longQT_indiv_df['person_id'] = longQT_indiv_df['person_id'].apply(str)
hl_LQT_indiv = hl.Table.from_pandas(longQT_indiv_df, key = "person_id")

# subset WGS matrix to only include patients with long QT 
LQT_WGS_mt = mt.semi_join_cols(hl_LQT_indiv)

# import table as hail MatrixTable
ancestry_pred = hl.import_table(ancestry_path,
                               key="research_id", 
                               impute=True, 
                               types={"research_id":"tstr","pca_features":hl.tarray(hl.tfloat)})

# get ancestry of LQT patients
ancestry_pred_LQT = ancestry_pred.semi_join(hl_LQT_indiv)

# get only ancestry and person_id
ancestry_pred_LQT = ancestry_pred_LQT.drop('probabilities','pca_features','ancestry_pred_other')
ancestry_pred_LQT = ancestry_pred_LQT.rename({'research_id' : 'person_id'})
# remove flagged samples
ancestry_pred_LQT = ancestry_pred_LQT.anti_join(sample_to_remove)

# annotate WGS table with ancestry
LQT_WGS_mt = LQT_WGS_mt.annotate_cols(ancestry_pred = ancestry_pred_LQT[LQT_WGS_mt.s].ancestry_pred)

# read VAT
vat_table = hl.import_table(vat_path, force=True, quote='"', delimiter="\t", force_bgz=True, types={'position':hl.tint32
                                                                                              })
# filter VAT to only include canonical transcripts
vat_table = vat_table.filter(vat_table["is_canonical_transcript"] == "true")

# add locus as row feild
vat_table = vat_table.annotate(locus = hl.locus(
    vat_table.contig, vat_table.position, reference_genome='GRCh38'), allele=hl.array([vat_table.ref_allele, vat_table.alt_allele]))

# add locus as key
vat_table = vat_table.key_by("locus", "allele")

vat_table = vat_table.semi_join(LQT_WGS_mt.rows())

# annotate revel scores, gene symbol, consequence, variant type, gene ids, omim phenotypes, clinvar
LQT_WGS_mt = LQT_WGS_mt.annotate_rows(revel = vat_table[LQT_WGS_mt.row_key].revel,
                                      gene_symbol = vat_table[LQT_WGS_mt.row_key].gene_symbol,
                                      consequence = vat_table[LQT_WGS_mt.row_key].consequence, 
                                      gvs_afr_af = vat_table[LQT_WGS_mt.row_key].gvs_afr_af,
                                      gvs_amr_af = vat_table[LQT_WGS_mt.row_key].gvs_amr_af,
                                      gvs_eas_af = vat_table[LQT_WGS_mt.row_key].gvs_eas_af,
                                      gvs_eur_af = vat_table[LQT_WGS_mt.row_key].gvs_eur_af,
                                      gvs_mid_af = vat_table[LQT_WGS_mt.row_key].gvs_mid_af,
                                      gvs_sas_af = vat_table[LQT_WGS_mt.row_key].gvs_sas_af,
                                      gvs_oth_af = vat_table[LQT_WGS_mt.row_key].gvs_oth_af, 
                                     )

# filter to only exclude genes from stop list
gene_stoplist = pd.read_csv("gene-stoplist.tsv", sep="\t")
gene_stoplist = gene_stoplist["Gene"].tolist()

LQT_WGS_mt = LQT_WGS_mt.filter_rows(hl.literal(gene_stoplist).contains(LQT_WGS_mt.gene_symbol), keep = False)

## create a table for each ancestry

# ancestry labels provided by All of Us
possible_ancestries = ["afr", "amr", "eas", "eur", "mid", "sas", "oth"]
# allele frequency feild for subpop 
af_subpop_label = {a:"gvs_"+a+"_af" for a in possible_ancestries}
# create a dictionary of wgs tables with ancestry as key, and annotated wgs table filtered on ancestry as value 
wgs_tables_by_ancestry = {a: LQT_WGS_mt.filter_cols(LQT_WGS_mt.ancestry_pred == a, keep=True) for a in possible_ancestries}

# annotate each population table with respective allele frequency 
filtered_wgs_tables_by_ancestry = {"afr": wgs_tables_by_ancestry["afr"].annotate_rows(gvs_afr_af = wgs_tables_by_ancestry["afr"].gvs_afr_af),
                                   "amr": wgs_tables_by_ancestry["amr"].annotate_rows(gvs_amr_af = wgs_tables_by_ancestry["amr"].gvs_amr_af),
                                   "eas": wgs_tables_by_ancestry["eas"].annotate_rows(gvs_eas_af = wgs_tables_by_ancestry["eas"].gvs_eas_af),
                                   "eur": wgs_tables_by_ancestry["eur"].annotate_rows(gvs_eur_af = wgs_tables_by_ancestry["eur"].gvs_eur_af),
                                   "mid": wgs_tables_by_ancestry["mid"].annotate_rows(gvs_mid_af = wgs_tables_by_ancestry["mid"].gvs_mid_af),
                                   "sas": wgs_tables_by_ancestry["sas"].annotate_rows(gvs_sas_af = wgs_tables_by_ancestry["sas"].gvs_sas_af),
                                   "oth": wgs_tables_by_ancestry["oth"].annotate_rows(gvs_oth_af = wgs_tables_by_ancestry["oth"].gvs_oth_af),
                                   }
# filter by allele frequency
filtered_wgs_tables_by_ancestry = {"afr": wgs_tables_by_ancestry["afr"].filter_rows(hl.float64(wgs_tables_by_ancestry["afr"].gvs_afr_af) <= 0.00357, keep = True),
                                   "amr": wgs_tables_by_ancestry["amr"].filter_rows(hl.float64(wgs_tables_by_ancestry["amr"].gvs_amr_af) <= 0.00357, keep = True),
                                   "eas": wgs_tables_by_ancestry["eas"].filter_rows(hl.float64(wgs_tables_by_ancestry["eas"].gvs_eas_af) <= 0.00357, keep = True),
                                   "eur": wgs_tables_by_ancestry["eur"].filter_rows(hl.float64(wgs_tables_by_ancestry["eur"].gvs_eur_af) <= 0.00357, keep = True),
                                   "mid": wgs_tables_by_ancestry["mid"].filter_rows(hl.float64(wgs_tables_by_ancestry["mid"].gvs_mid_af) <= 0.00357, keep = True),
                                   "sas": wgs_tables_by_ancestry["sas"].filter_rows(hl.float64(wgs_tables_by_ancestry["sas"].gvs_sas_af) <= 0.00357, keep = True),
                                   "oth": wgs_tables_by_ancestry["oth"].filter_rows(hl.float64(wgs_tables_by_ancestry["oth"].gvs_oth_af) <= 0.00357, keep = True),
                                   }

# create a table for each consequence 
for ancs in filtered_wgs_tables_by_ancestry.keys():
    filtered_wgs_tables_by_ancestry[ancs] = {'missense': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('missense'), keep = True),
                                             'frameshift_variant': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('frameshift_variant'), keep = True),
                                             'stop_lost': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('stop_lost'), keep = True),
                                             'start_lost': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('start_lost'), keep = True),
                                             'stop_gained': filtered_wgs_tables_by_ancestry[ancs].filter_rows(filtered_wgs_tables_by_ancestry[ancs].consequence.contains('stop_gained'), keep = True),
                                            }

# filter missense variables by REVEL scores  
for ancs in filtered_wgs_tables_by_ancestry.keys():
    filtered_wgs_tables_by_ancestry[ancs]['missense'] = filtered_wgs_tables_by_ancestry[ancs]['missense'].filter_rows(hl.float64(filtered_wgs_tables_by_ancestry[ancs]['missense'].revel) > 0.644, keep = True)

# combine consequence tables for each ancestry
combined_consequences = dict()
for ancs in filtered_wgs_tables_by_ancestry.keys():
    mt = filtered_wgs_tables_by_ancestry[ancs]['missense']
    other_consequences = list(filtered_wgs_tables_by_ancestry[ancs].keys())[1:]
    for c in other_consequences:
        mt = mt.union_cols(filtered_wgs_tables_by_ancestry[ancs][c])
    combined_consequences[ancs] = mt

# combine ancestry tables 
mt = combined_consequences['afr']
other_matrices = list(combined_consequences.keys())[1:]
for ancs in other_matrices:
    mt = mt.union_cols(combined_consequences[ancs])

# annotate phenotype information
mt = mt.annotate_rows(gene_id = vat_table[mt.row_key].gene_id,
                      omim_phenotypes_name = vat_table[mt.row_key].omim_phenotypes_name,
                      clinvar_classification = vat_table[mt.row_key].clinvar_classification,
                      clinvar_last_updated = vat_table[mt.row_key].clinvar_last_updated,
                      clinvar_phenotype = vat_table[mt.row_key].clinvar_phenotype)

# get entries
pv_table = mt.filter_entries(mt.GT.is_non_ref()).entries()
pv_table = pv_table.key_by(pv_table.locus, pv_table.alleles)

# get fields needed for candidate gene table
pv_table = pv_table.select(pv_table.s,
                           pv_table.gene_id, 
                           pv_table.gene_symbol,
                           pv_table.consequence, 
                           pv_table.omim_phenotypes_name, 
                           pv_table.clinvar_classification, 
                           pv_table.clinvar_phenotype,
                           pv_table.clinvar_last_updated,
                           pv_table.ancestry_pred
                          )

pv_table.write("pv_table.ht")

hl.read_table("pv_table.ht").export_vcf(pv_table, 'pv_table.vcf', parallel='header_per_shard')

Screenshot of Spark Console:

1 Like

Hey @nistha !

I’m really sorry to hear you’re having a bad time with Hail.

The problem is the VAT table. It is very large (I believe around 100GiB; I don’t have direct access to the AoU data) and it is not sorted by locus and alleles. It is sorted by locus only. As a result, Hail needs to sort the entire table before any forward progress can be made. This table is extremely large. One of the most expert users of Hail that I know spent multiple days getting this imported and formatted for Hail purposes. Are AoU users able to read data that other AoU users have produced? It would be easiest to just user her already imported Table.

vat_table = hl.import_table(vat_path, force=True, quote='"', delimiter="\t", force_bgz=True, types={'position':hl.tint32
                                                                                              })
# filter VAT to only include canonical transcripts
vat_table = vat_table.filter(vat_table["is_canonical_transcript"] == "true")

# add locus as row feild
vat_table = vat_table.annotate(locus = hl.locus(
    vat_table.contig, vat_table.position, reference_genome='GRCh38'), allele=hl.array([vat_table.ref_allele, vat_table.alt_allele]))

# add locus as key
vat_table = vat_table.key_by("locus", "allele")

A couple other more minor issues:

  1. The entries table creates a coordinate-wise representation which is inefficient and fairly large. What do you want the VCF to look like? I’m sure we can cook something up that doesn’t use entries.

    pv_table = mt.filter_entries(mt.GT.is_non_ref()).entries()
    
  2. Splitting a matrix table into several matrix tables, one for each ancestry, filtering the rows, and then trying to union_cols won’t work. Each matrix table has differing numbers of rows, so union_cols can’t put them back together. What is your desired output? Do you want one VCF with all the samples, but only containing the rows where at least one ancestry has a small enough allele frequency?

1 Like

Hello @danking!

I think there is a way to copy files from one workspace bucket to the another workspace bucket if you have the exact URL of the workspace bucket.

We figured out how to filter on ancestry-based allele-frequency and consequence without breaking it up into multiple tables using filter_entries, so I don’t think that this should be a problem anymore.

Our goal is to get a final table of filtered variants for the samples where each row looks something like this:
s (sample id), locus, allele, gene_id, gene_symbol, consequence, clinvar_classification, ancestry_pred

We were using the entries table because we assumed that once we are done filtering there will be very few unfiltered entries.

Thank you so much for your help!

Hi @danking!

I just wanted to follow up and ask about the Hail-formatted VAT table. Is it possible to access it and if not, what would be the best way to work with the VAT table in Hail?

Thanks again!