Improving pipeline performance

Hello,
Just wanted to see if there’s anyway to improve pipeline performance here. Each of the write commands near the end take a while to run (>30min each). It seems it may be re-performing all the previous steps on each write. not sure if there’s a way to avoid this?

hl.import_vcf('gs://maryam_lipids/MESA_VIRGO/QCed_v2_MESA_VIRGO_TAICHI_VCF/MESA_VIRGO_TAICHI.QCed.v2.chr*.vcf.gz',force_bgz=True,reference_genome='GRCh37').write('gs://maryam_lipids/MESA_VIRGO/SCAD/MESA_VIRGO_TAICHI_QCed.v2.allVar.mt')

mt = hl.read_matrix_table('gs://maryam_lipids/MESA_VIRGO/SCAD/MESA_VIRGO_TAICHI_QCed.v2.allVar.mt').repartition(5000)

sampleAnnot = hl.import_table('gs://maryam_lipids/MESA_VIRGO/SCAD/SCAD_VIR_IDs.txt', impute=True,key="x")
mt = mt.filter_cols(hl.is_defined(sampleAnnot[mt.col_key]) , keep=True) 

#### remove  monomorphic variants
mt = hl.variant_qc(mt)
mt = mt.filter_rows((mt.variant_qc.AF[1] > 0) & (mt.variant_qc.AF[1] < 1),keep = True)

### FAdding  annotations
exonicVEP=hl.import_table('gs://maryam_lipids/MESA_VIRGO/all_MESA_VIRGO_TAICHI_Var_Annot.first4Fields.v3.gz', force_bgz=True, impute=True)
exonicVEP = exonicVEP.key_by(**hl.parse_variant(exonicVEP.v))
mt = mt.annotate_rows(**exonicVEP.index(mt.row_key))
mt.describe()

### Adding dbnsfp annotations
db = hl.experimental.DB()
mt2 = db.annotate_rows_db(mt, "dbNSFP_variants")
#print(mt2.describe())
mt2 = mt2.annotate_rows(metasvm = mt2.dbNSFP_variants.MetaSVM_pred)
mt3 = mt2.drop('dbNSFP_variants')

### Adding gnomad exomes annotations from here: https://gnomad.broadinstitute.org/downloads
gnomad_exomes = hl.read_table('gs://gnomad-public/release/2.1.1/ht/exomes/gnomad.exomes.r2.1.1.sites.ht') #.select('freq','popmax')
gnomad_nfe_index = gnomad_exomes.freq_index_dict['gnomad_nfe'].collect()[0]
gnomad_fin_index = gnomad_exomes.freq_index_dict['gnomad_fin'].collect()[0]
gnomad_oth_index = gnomad_exomes.freq_index_dict['gnomad_oth'].collect()[0] 
gnomad_asj_index = gnomad_exomes.freq_index_dict['gnomad_asj'].collect()[0]
gnomad_afr_index = gnomad_exomes.freq_index_dict['gnomad_afr'].collect()[0]
gnomad_eas_index = gnomad_exomes.freq_index_dict['gnomad_eas'].collect()[0]
gnomad_sas_index = gnomad_exomes.freq_index_dict['gnomad_sas'].collect()[0]
gnomad_amr_index = gnomad_exomes.freq_index_dict['gnomad_amr'].collect()[0]
raw_index = gnomad_exomes.freq_index_dict['gnomad_raw'].collect()[0]
joined_exomes = gnomad_exomes[mt3.row_key]

mt4 = mt3.annotate_rows(nfe_AF_e=joined_exomes.freq[gnomad_nfe_index].AF,fin_AF_e=joined_exomes.freq[gnomad_fin_index].AF,oth_AF_e=joined_exomes.freq[gnomad_oth_index].AF,asj_AF_e=joined_exomes.freq[gnomad_asj_index].AF,afr_AF_e=joined_exomes.freq[gnomad_afr_index].AF,eas_AF_e=joined_exomes.freq[gnomad_eas_index].AF,sas_AF_e=joined_exomes.freq[gnomad_sas_index].AF,amr_AF_e=joined_exomes.freq[gnomad_amr_index].AF,raw_AF_e=joined_exomes.freq[raw_index].AF)
mt4.rows().select().show(5)
#### Gnomad genomes annotations from here: https://gnomad.broadinstitute.org/downloads
gnomad_genomes = hl.read_table('gs://gnomad-public/release/2.1.1/ht/genomes/gnomad.genomes.r2.1.1.sites.ht') #.select('freq','popmax')
adj_nfe_index = gnomad_genomes.freq_index_dict['gnomad_nfe'].collect()[0]
adj_fin_index = gnomad_genomes.freq_index_dict['gnomad_fin'].collect()[0]
adj_oth_index = gnomad_genomes.freq_index_dict['gnomad_oth'].collect()[0]
adj_asj_index = gnomad_genomes.freq_index_dict['gnomad_asj'].collect()[0]
adj_afr_index = gnomad_genomes.freq_index_dict['gnomad_afr'].collect()[0]
adj_eas_index = gnomad_genomes.freq_index_dict['gnomad_eas'].collect()[0]
adj_amr_index = gnomad_genomes.freq_index_dict['gnomad_amr'].collect()[0]
raw_index = gnomad_genomes.freq_index_dict['gnomad_raw'].collect()[0]
joined_genomes = gnomad_genomes[mt4.row_key]
mt5 = mt4.annotate_rows(nfe_AF_g=joined_genomes.freq[adj_nfe_index].AF,fin_AF_g=joined_genomes.freq[adj_fin_index].AF,oth_AF_g=joined_genomes.freq[adj_oth_index].AF,asj_AF_g=joined_genomes.freq[adj_asj_index].AF,afr_AF_g=joined_genomes.freq[adj_afr_index].AF,eas_AF_g=joined_genomes.freq[adj_eas_index].AF,amr_AF_g=joined_genomes.freq[adj_amr_index].AF,raw_AF_g=joined_genomes.freq[raw_index].AF)


### Adding discoverEHR annotations 
discovEHR = hl.import_vcf('gs://maryam_lipids/MESA_VIRGO/discoverEHR_GHS_Freeze_50.L3DP10.pVCF.frq.vcf.bgz',reference_genome='GRCh37') ###b37

discover = discovEHR.key_rows_by(locus=discovEHR.locus,alleles=discovEHR.alleles)
#discovEHR.describe()
mt6 = mt5.annotate_rows(discvrehr = discover.index_rows(mt5.row_key))
mt6= mt6.annotate_rows(discvrehr_af = mt6.discvrehr.info.AF,discovehr_af_lt_001 = mt6.discvrehr.info.AF_LT_001)
mt6 = mt6.drop('discvrehr')


print(mt6.describe())

## Writing annotated mt for all variants among SCAD cases: 
mt6.write('gs://maryam_lipids/MESA_VIRGO/SCAD/VIRGO_SCADcases_exons.allVar.mt')
 
## Filtering to rare disruptive variants: 
CoronaryArtery_Genes = hl.import_table('gs://migen_v8/migen_v8_QCed_VDS_v2/pli_greater_than_orEqual_0.9.andGTEXCoronaryArtery_greaterThan2.genelist.txt',no_header=True)
CoronaryArtery_Genes = CoronaryArtery_Genes.annotate(gene = CoronaryArtery_Genes.f0, CoronaryArtExpr = 1).key_by('gene')

mt7 = mt6.annotate_rows(**CoronaryArtery_Genes.index(mt6.Gene))
LOF_Conseq = ["frameshift_variant", "transcript_ablation" , "splice_acceptor_variant", "splice_donor_variant", "stop_gained", "start_lost"]
mt8 = mt7.filter_rows(((hl.literal(LOF_Conseq).contains(mt7.Consq)) | ((mt7.metasvm).contains('D'))) & \
												(mt7.CoronaryArtExpr == 1) & \
												((mt7.discovehr_af_lt_001) | (~(hl.is_defined(mt7.discovehr_af_lt_001)))) & \
												((mt7.raw_AF_e <= 0.001) | (~(hl.is_defined(mt7.raw_AF_e)))) & \
												((mt7.afr_AF_e < 0.01)  | (~(hl.is_defined(mt7.afr_AF_e)))) & \
                                                ((mt7.amr_AF_e < 0.01) | (~(hl.is_defined(mt7.amr_AF_e)))) & \
                                                ((mt7.asj_AF_e < 0.01) | (~(hl.is_defined(mt7.asj_AF_e)))) & \
                                                ((mt7.eas_AF_e < 0.01) | (~(hl.is_defined(mt7.eas_AF_e)))) & \
                                                ((mt7.fin_AF_e < 0.01) | (~(hl.is_defined(mt7.fin_AF_e)))) & \
                                                ((mt7.nfe_AF_e < 0.01) | (~(hl.is_defined(mt7.nfe_AF_e)))) & \
                                                ((mt7.sas_AF_e < 0.01) | (~(hl.is_defined(mt7.sas_AF_e)))) & \
                                                ((mt7.oth_AF_e < 0.01) | (~(hl.is_defined(mt7.oth_AF_e)))) & \
                                                ((mt7.raw_AF_g <= 0.001) | (~(hl.is_defined(mt7.raw_AF_g)))) & \
                                                ((mt7.afr_AF_g < 0.01)  | (~(hl.is_defined(mt7.afr_AF_g)))) & \
                                                ((mt7.amr_AF_g < 0.01) | (~(hl.is_defined(mt7.amr_AF_g)))) & \
                                                ((mt7.asj_AF_g < 0.01) | (~(hl.is_defined(mt7.asj_AF_g)))) & \
                                                ((mt7.eas_AF_g < 0.01) | (~(hl.is_defined(mt7.eas_AF_g )))) & \
                                                ((mt7.fin_AF_g < 0.01) | (~(hl.is_defined(mt7.fin_AF_g)))) & \
                                                ((mt7.nfe_AF_g < 0.01) | (~(hl.is_defined(mt7.nfe_AF_g)))) & \
                                                ((mt7.oth_AF_g < 0.01) | (~(hl.is_defined(mt7.oth_AF_g)))), keep = True) 



#Exporting list of filtered variants: 
rowsmt8 = mt8.rows().select('v', 'rsID', 'Consq', 'Gene', 'metasvm', 'raw_AF_g', 'raw_AF_e', 'discovehr_af_lt_001', 'CoronaryArtExpr', 'variant_qc','info.AC', 'info.AN', 'info.AF')
rowsmt8.export('gs://maryam_lipids/MESA_VIRGO/SCAD/VIRGO_SCADcases_exons.AllrareDeletvariants.bgz')

#Exporting list of filtered variants that are among 6 collagen genes: 
COL_GENES = ["COL1A1", "COL1A2","COL3A1", "COL5A1","COL5A2", "COL27A1"]
mt9 = mt8.filter_rows(hl.literal(COL_GENES).contains(mt8.Gene))
mt9 = mt9.filter_entries(mt.GT9.n_alt_alleles() > 0)

mt10=mt9.drop('info', 'filters', 'f0')
mt10.entries().export('gs://maryam_lipids/MESA_VIRGO/SCAD/VIRGO_SCADcases_exons.variants_and_gts.bgz')

I think the first problem is that doing .repartition(5000) will involve a shuffle, which you probably can’t do with this data. It’s better to use the min_partitions argument on import_vcf to read with more partitions.

If the data is already written and you don’t want to rewrite it, you can do this to repartition and write (then delete the old one):

mt = hl.read_matrix_table(path)
intervals = mt._calculate_new_partitions(5000)

hl.read_matrix_table(path, _intervals=intervals).write(path2)

Thanks @tpoterba!

Hello,

I found this thread and I have a (probably dumb) question related to this: in gnomad genome hail table I also want to get specific indexes of freq but I see some discrepancies between the number of values in freq and the number of indexes.

gnomad = hl.read_table('gs://gnomad-public/release/2.1.1/ht/genomes/gnomad.genomes.r2.1.1.sites.ht')

If I check the number of indexes len(hl.eval(gnomad.freq_index_dict.keys())) I find 116 but if I check the number of AC for example, I find 214 values so I don’t understand what they could be.

hl.len(gnomad.freq.AC).show()
2020-11-17 13:37:20 Hail: INFO: reading 1 of 10000 data partitions
+---------------+------------+--------+
| locus         | alleles    | <expr> |
+---------------+------------+--------+
| locus<GRCh37> | array<str> |  int32 |
+---------------+------------+--------+
| 1:861324      | ["G","A"]  |    214 |
| 1:861327      | ["C","T"]  |    214 |
| 1:861333      | ["G","A"]  |    214 |
| 1:861337      | ["C","G"]  |    214 |
| 1:861349      | ["C","T"]  |    214 |
| 1:861353      | ["C","T"]  |    214 |
| 1:861369      | ["G","A"]  |    214 |
| 1:861383      | ["C","A"]  |    214 |
+---------------+------------+--------+

I would like to be sure that the indexes I get if I use gnomad.freq_index_dict are the right ones.

Thank you in advance for your help!