Improving pipeline performance

Hello,
Just wanted to see if there’s anyway to improve pipeline performance here. Each of the write commands near the end take a while to run (>30min each). It seems it may be re-performing all the previous steps on each write. not sure if there’s a way to avoid this?

hl.import_vcf('gs://maryam_lipids/MESA_VIRGO/QCed_v2_MESA_VIRGO_TAICHI_VCF/MESA_VIRGO_TAICHI.QCed.v2.chr*.vcf.gz',force_bgz=True,reference_genome='GRCh37').write('gs://maryam_lipids/MESA_VIRGO/SCAD/MESA_VIRGO_TAICHI_QCed.v2.allVar.mt')

mt = hl.read_matrix_table('gs://maryam_lipids/MESA_VIRGO/SCAD/MESA_VIRGO_TAICHI_QCed.v2.allVar.mt').repartition(5000)

sampleAnnot = hl.import_table('gs://maryam_lipids/MESA_VIRGO/SCAD/SCAD_VIR_IDs.txt', impute=True,key="x")
mt = mt.filter_cols(hl.is_defined(sampleAnnot[mt.col_key]) , keep=True) 

#### remove  monomorphic variants
mt = hl.variant_qc(mt)
mt = mt.filter_rows((mt.variant_qc.AF[1] > 0) & (mt.variant_qc.AF[1] < 1),keep = True)

### FAdding  annotations
exonicVEP=hl.import_table('gs://maryam_lipids/MESA_VIRGO/all_MESA_VIRGO_TAICHI_Var_Annot.first4Fields.v3.gz', force_bgz=True, impute=True)
exonicVEP = exonicVEP.key_by(**hl.parse_variant(exonicVEP.v))
mt = mt.annotate_rows(**exonicVEP.index(mt.row_key))
mt.describe()

### Adding dbnsfp annotations
db = hl.experimental.DB()
mt2 = db.annotate_rows_db(mt, "dbNSFP_variants")
#print(mt2.describe())
mt2 = mt2.annotate_rows(metasvm = mt2.dbNSFP_variants.MetaSVM_pred)
mt3 = mt2.drop('dbNSFP_variants')

### Adding gnomad exomes annotations from here: https://gnomad.broadinstitute.org/downloads
gnomad_exomes = hl.read_table('gs://gnomad-public/release/2.1.1/ht/exomes/gnomad.exomes.r2.1.1.sites.ht') #.select('freq','popmax')
gnomad_nfe_index = gnomad_exomes.freq_index_dict['gnomad_nfe'].collect()[0]
gnomad_fin_index = gnomad_exomes.freq_index_dict['gnomad_fin'].collect()[0]
gnomad_oth_index = gnomad_exomes.freq_index_dict['gnomad_oth'].collect()[0] 
gnomad_asj_index = gnomad_exomes.freq_index_dict['gnomad_asj'].collect()[0]
gnomad_afr_index = gnomad_exomes.freq_index_dict['gnomad_afr'].collect()[0]
gnomad_eas_index = gnomad_exomes.freq_index_dict['gnomad_eas'].collect()[0]
gnomad_sas_index = gnomad_exomes.freq_index_dict['gnomad_sas'].collect()[0]
gnomad_amr_index = gnomad_exomes.freq_index_dict['gnomad_amr'].collect()[0]
raw_index = gnomad_exomes.freq_index_dict['gnomad_raw'].collect()[0]
joined_exomes = gnomad_exomes[mt3.row_key]

mt4 = mt3.annotate_rows(nfe_AF_e=joined_exomes.freq[gnomad_nfe_index].AF,fin_AF_e=joined_exomes.freq[gnomad_fin_index].AF,oth_AF_e=joined_exomes.freq[gnomad_oth_index].AF,asj_AF_e=joined_exomes.freq[gnomad_asj_index].AF,afr_AF_e=joined_exomes.freq[gnomad_afr_index].AF,eas_AF_e=joined_exomes.freq[gnomad_eas_index].AF,sas_AF_e=joined_exomes.freq[gnomad_sas_index].AF,amr_AF_e=joined_exomes.freq[gnomad_amr_index].AF,raw_AF_e=joined_exomes.freq[raw_index].AF)
mt4.rows().select().show(5)
#### Gnomad genomes annotations from here: https://gnomad.broadinstitute.org/downloads
gnomad_genomes = hl.read_table('gs://gnomad-public/release/2.1.1/ht/genomes/gnomad.genomes.r2.1.1.sites.ht') #.select('freq','popmax')
adj_nfe_index = gnomad_genomes.freq_index_dict['gnomad_nfe'].collect()[0]
adj_fin_index = gnomad_genomes.freq_index_dict['gnomad_fin'].collect()[0]
adj_oth_index = gnomad_genomes.freq_index_dict['gnomad_oth'].collect()[0]
adj_asj_index = gnomad_genomes.freq_index_dict['gnomad_asj'].collect()[0]
adj_afr_index = gnomad_genomes.freq_index_dict['gnomad_afr'].collect()[0]
adj_eas_index = gnomad_genomes.freq_index_dict['gnomad_eas'].collect()[0]
adj_amr_index = gnomad_genomes.freq_index_dict['gnomad_amr'].collect()[0]
raw_index = gnomad_genomes.freq_index_dict['gnomad_raw'].collect()[0]
joined_genomes = gnomad_genomes[mt4.row_key]
mt5 = mt4.annotate_rows(nfe_AF_g=joined_genomes.freq[adj_nfe_index].AF,fin_AF_g=joined_genomes.freq[adj_fin_index].AF,oth_AF_g=joined_genomes.freq[adj_oth_index].AF,asj_AF_g=joined_genomes.freq[adj_asj_index].AF,afr_AF_g=joined_genomes.freq[adj_afr_index].AF,eas_AF_g=joined_genomes.freq[adj_eas_index].AF,amr_AF_g=joined_genomes.freq[adj_amr_index].AF,raw_AF_g=joined_genomes.freq[raw_index].AF)


### Adding discoverEHR annotations 
discovEHR = hl.import_vcf('gs://maryam_lipids/MESA_VIRGO/discoverEHR_GHS_Freeze_50.L3DP10.pVCF.frq.vcf.bgz',reference_genome='GRCh37') ###b37

discover = discovEHR.key_rows_by(locus=discovEHR.locus,alleles=discovEHR.alleles)
#discovEHR.describe()
mt6 = mt5.annotate_rows(discvrehr = discover.index_rows(mt5.row_key))
mt6= mt6.annotate_rows(discvrehr_af = mt6.discvrehr.info.AF,discovehr_af_lt_001 = mt6.discvrehr.info.AF_LT_001)
mt6 = mt6.drop('discvrehr')


print(mt6.describe())

## Writing annotated mt for all variants among SCAD cases: 
mt6.write('gs://maryam_lipids/MESA_VIRGO/SCAD/VIRGO_SCADcases_exons.allVar.mt')
 
## Filtering to rare disruptive variants: 
CoronaryArtery_Genes = hl.import_table('gs://migen_v8/migen_v8_QCed_VDS_v2/pli_greater_than_orEqual_0.9.andGTEXCoronaryArtery_greaterThan2.genelist.txt',no_header=True)
CoronaryArtery_Genes = CoronaryArtery_Genes.annotate(gene = CoronaryArtery_Genes.f0, CoronaryArtExpr = 1).key_by('gene')

mt7 = mt6.annotate_rows(**CoronaryArtery_Genes.index(mt6.Gene))
LOF_Conseq = ["frameshift_variant", "transcript_ablation" , "splice_acceptor_variant", "splice_donor_variant", "stop_gained", "start_lost"]
mt8 = mt7.filter_rows(((hl.literal(LOF_Conseq).contains(mt7.Consq)) | ((mt7.metasvm).contains('D'))) & \
												(mt7.CoronaryArtExpr == 1) & \
												((mt7.discovehr_af_lt_001) | (~(hl.is_defined(mt7.discovehr_af_lt_001)))) & \
												((mt7.raw_AF_e <= 0.001) | (~(hl.is_defined(mt7.raw_AF_e)))) & \
												((mt7.afr_AF_e < 0.01)  | (~(hl.is_defined(mt7.afr_AF_e)))) & \
                                                ((mt7.amr_AF_e < 0.01) | (~(hl.is_defined(mt7.amr_AF_e)))) & \
                                                ((mt7.asj_AF_e < 0.01) | (~(hl.is_defined(mt7.asj_AF_e)))) & \
                                                ((mt7.eas_AF_e < 0.01) | (~(hl.is_defined(mt7.eas_AF_e)))) & \
                                                ((mt7.fin_AF_e < 0.01) | (~(hl.is_defined(mt7.fin_AF_e)))) & \
                                                ((mt7.nfe_AF_e < 0.01) | (~(hl.is_defined(mt7.nfe_AF_e)))) & \
                                                ((mt7.sas_AF_e < 0.01) | (~(hl.is_defined(mt7.sas_AF_e)))) & \
                                                ((mt7.oth_AF_e < 0.01) | (~(hl.is_defined(mt7.oth_AF_e)))) & \
                                                ((mt7.raw_AF_g <= 0.001) | (~(hl.is_defined(mt7.raw_AF_g)))) & \
                                                ((mt7.afr_AF_g < 0.01)  | (~(hl.is_defined(mt7.afr_AF_g)))) & \
                                                ((mt7.amr_AF_g < 0.01) | (~(hl.is_defined(mt7.amr_AF_g)))) & \
                                                ((mt7.asj_AF_g < 0.01) | (~(hl.is_defined(mt7.asj_AF_g)))) & \
                                                ((mt7.eas_AF_g < 0.01) | (~(hl.is_defined(mt7.eas_AF_g )))) & \
                                                ((mt7.fin_AF_g < 0.01) | (~(hl.is_defined(mt7.fin_AF_g)))) & \
                                                ((mt7.nfe_AF_g < 0.01) | (~(hl.is_defined(mt7.nfe_AF_g)))) & \
                                                ((mt7.oth_AF_g < 0.01) | (~(hl.is_defined(mt7.oth_AF_g)))), keep = True) 



#Exporting list of filtered variants: 
rowsmt8 = mt8.rows().select('v', 'rsID', 'Consq', 'Gene', 'metasvm', 'raw_AF_g', 'raw_AF_e', 'discovehr_af_lt_001', 'CoronaryArtExpr', 'variant_qc','info.AC', 'info.AN', 'info.AF')
rowsmt8.export('gs://maryam_lipids/MESA_VIRGO/SCAD/VIRGO_SCADcases_exons.AllrareDeletvariants.bgz')

#Exporting list of filtered variants that are among 6 collagen genes: 
COL_GENES = ["COL1A1", "COL1A2","COL3A1", "COL5A1","COL5A2", "COL27A1"]
mt9 = mt8.filter_rows(hl.literal(COL_GENES).contains(mt8.Gene))
mt9 = mt9.filter_entries(mt.GT9.n_alt_alleles() > 0)

mt10=mt9.drop('info', 'filters', 'f0')
mt10.entries().export('gs://maryam_lipids/MESA_VIRGO/SCAD/VIRGO_SCADcases_exons.variants_and_gts.bgz')

I think the first problem is that doing .repartition(5000) will involve a shuffle, which you probably can’t do with this data. It’s better to use the min_partitions argument on import_vcf to read with more partitions.

If the data is already written and you don’t want to rewrite it, you can do this to repartition and write (then delete the old one):

mt = hl.read_matrix_table(path)
intervals = mt._calculate_new_partitions(5000)

hl.read_matrix_table(path, _intervals=intervals).write(path2)

Thanks @tpoterba!