Shortening MT writing / export_vcf in Variant QC

I am in the process of running a Variant QC analysis on the UK Biobank WES dataset of 470,000 participants. I ran a preliminary test run of our script on Chromosome 21 for all 470k participants to get an understanding of total run time/cost. Chromosome 21 represents roughly 1.1% of the total WES data, and ran for an expensive 90 hours using our specified settings: 5 total nodes, each with 72 cores per node, and 144 GB of memory and 1800 GB of storage per node.

At different points in our script, I have incorporated lines writing to a MatrixTable as checkpoints for future reference. When writing to url_2 (shown below), this step takes roughly 20 minutes to run. When writing to url_3, the total run time takes roughly 68 hours. If I comment out the write to url_3, then the export_vcf step takes the same length of time (68 hours). However, if I start by reading the MT saved to url_3 generated in our inital run and complete the rest of the script, the export_vcf step takes roughly 2 hours.

Between this second MatrixTable writing step that is brief, and the final writing step that takes a long time, there are only relatively quick steps that run in a cumulative matter of seconds: 1) flagging monomorphic sites, 2) labeling high missingness variants, 3) running a HWE analysis, 4) running the batch association tests, and 5) altering the VCF metadata header. I have included the code for this section below.

I would greatly appreciate any insight or thoughts as to how I can shorten the total run time of our Variant QC workflow by decreasing the length of this final writing step. Thank you!

vcf.write(url_2, overwrite=True)
    print("Checkpoint 6.3")
    vcf = hl.read_matrix_table(url_2)
    print("Filtered Genotypes")

    # Remove sites which became monomorphic after running genotype filters
    vcf = flag_monomorphic(vcf)
    print("Flagged monomorphic sites")
    vcf = hl.variant_qc(vcf, 'qc')
    vcf = vcf.annotate_rows(
        filters=hl.cond(
            # flag variants with low call rate
            vcf.qc.call_rate <= (1 - args.missingness_threshold),
            vcf.filters.add("Missingness"),
            vcf.filters
        )
    )
    print("Flagged variants with high missingness")

    vcf = flag_hwe(vcf, args.pval_threshold, args.n_individuals)
    print("Finished HWE analysis")

    for col in batch_cols:
        print("Running batch association test for", col)
        vcf = flag_batch_association(
            vcf,
            col,
            args.pval_threshold
        )

    print("Finished Batch Associations")

    header_info = hl.get_vcf_metadata('file:///mnt/project/Exome Blocks- 2 Participants/ukb23157_c12_b30_v1_2_filtered.vcf.gz')
    header_info['filter']['PASS'] = {'Description': "All filters passed"}
    header_info['filter']['LowQual'] = {'Description': "Low quality"}
    header_info['filter']['LCR'] = {'Description': "This site falls within an annotated low-complexity region"}
    header_info['filter']['Mono'] = {'Description': "This site became monomorphic after filtering on individual genotype calls (see 'GDP', 'GAB', 'GGQ', and 'GNP' INFO fields)"}
    header_info['filter']['Singleton'] = {'Description': "This site became singleton after filtering on individual genotype calls (see 'GDP', 'GAB', 'GGQ', and 'GNP' INFO fields)"}
    header_info['filter']['Missingness'] = {'Description': "This site had a call rate below a user defined threshold after filtering genotype calls (see 'GDP', 'GAB', 'GGQ', and 'GNP' INFO fields)"}
    header_info['filter']['HWE_EUR'] = {'Description': "This autosomal (or non-pseudoautosomal X, Females only) site failed Hardy-Weinberg equilibrium expectations in EUR subpopulation"}
    header_info['filter']['HWE_EURFemale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Females in EUR subpopulation"}
    header_info['filter']['HWE_EURMale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Males in EUR subpopulation"}
    header_info['filter']['HWE_AFR'] = {'Description': "This autosomal (or non-pseudoautosomal X, Females only) site failed Hardy-Weinberg equilibrium expectations in AFR subpopulation"}
    header_info['filter']['HWE_AFRFemale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Females in AFR subpopulation"}
    header_info['filter']['HWE_AFRMale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Males in AFR subpopulation"}
    header_info['filter']['HWE_ASN'] = {'Description': "This autosomal (or non-pseudoautosomal X, Females only) site failed Hardy-Weinberg equilibrium expectations in ASN subpopulation"}
    header_info['filter']['HWE_ASNFemale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Females in ASN subpopulation"}
    header_info['filter']['HWE_ASNMale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Males in ASN subpopulation"}
    header_info['filter']['HWE_EAS'] = {'Description': "This autosomal (or non-pseudoautosomal X, Females only) site failed Hardy-Weinberg equilibrium expectations in EAS subpopulation"}
    header_info['filter']['HWE_EASFemale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Females in EAS subpopulation"}
    header_info['filter']['HWE_EASMale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Males in EAS subpopulation"}
    header_info['filter']['HWE_SAS'] = {'Description': "This autosomal (or non-pseudoautosomal X, Females only) site failed Hardy-Weinberg equilibrium expectations in SAS subpopulation"}
    header_info['filter']['HWE_SASFemale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Females in SAS subpopulation"}
    header_info['filter']['HWE_SASMale'] = {'Description': "This Pseudoautosomal X site failed Hardy-Weinberg equilibrium expectations for Males in SAS subpopulation"}
    header_info['info']['AC'] =  {'Description': 'Allele count in genotypes, for each ALT allele, in the same order as listed', 'Number': 'A', 'Type': "Integer"}
    header_info['info']['AF'] =  {'Description': 'Allele Frequency, for each ALT allele, in the same order as listed', 'Number': 'A', "Type": "Float"}
    header_info['info']['AN'] =  {'Description': 'Total number of alleles in called genotypes', 'Number': '1', 'Type': 'Integer'}
    header_info['info']['wasSplit'] =  {'Description': 'Specifies that the variant was split from a multi-allelic site', 'Number': '0', 'Type': 'Flag'}
    header_info['info']['GDP'] = {'Description': "Count of genotype calls removed per variant by the DP (depth threshold) filter", 'Number': '1', 'Type': 'Integer'}
    header_info['info']['GAB'] = {'Description': "Count of genotype calls removed per variant by the AB (allelic coverage balance) filter", 'Number': '1', 'Type': 'Integer'}
    header_info['info']['GGQ'] = {'Description': "Count of genotype calls removed per variant by the GQ (genotype quality) filter", 'Number': '1', 'Type': 'Integer'}
    header_info['info']['GNP'] = {'Description': "Count of genotype calls removed per variant by the chrX nonPar filter", 'Number': '1', 'Type': 'Integer'}
    header_info['info']['pHWE_EUR'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test in EUR sub-population. This field only applies to autosomal (or non-pseudoautosomal X, Females Only) variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_EURFemale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Females in EUR sub-population. This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_EURMale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Males in EUR sub-population.This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_AFR'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test in AFR sub-population. This field only applies to autosomal (or non-pseudoautosomal X, Females Only) variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_AFRFemale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Females in AFR sub-population. This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_AFRMale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Males in AFR sub-population.This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_ASN'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test in ASN sub-population. This field only applies to autosomal (or non-pseudoautosomal X, Females Only) variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_ASNFemale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Females in ASN sub-population. This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_ASNMale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Males in ASN sub-population.This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_EAS'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test in EAS sub-population. This field only applies to autosomal (or non-pseudoautosomal X, Females Only) variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_EASFemale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Females in EAS sub-population. This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_EASMale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Males in EAS sub-population.This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_SAS'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test in SAS sub-population. This field only applies to autosomal (or non-pseudoautosomal X, Females Only) variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_SASFemale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Females in SAS sub-population. This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}
    header_info['info']['pHWE_SASMale'] =  {'Description': 'P-value from Hardy-Weinberg Equilibrium test for Males in SAS sub-population.This field only applies to Pseudoautosomal X variants (-1 if not applicable to variant or not enough individuals in group)', 'Number': '1', 'Type': 'Float'}

    for col in batch_cols:
        header_info['filter'][col] = {'Description': "This site failed batch association with "+column_descriptions[col]}
        header_info['info']['p{}'.format(col)] = {'Description': 'P-value of variant association test with {}'.format(col), 'Number': '1', 'Type': 'Float'}

    print("Finished Metadata Changes")

    vcf.write(url_3, overwrite=True)
    vcf = hl.read_matrix_table(url_3)

    print("Finished final MT write")

    # Export the VCF with all variants
    hl.export_vcf(
        vcf,
        args.prefix+'.annotated.vcf.bgz',
        append_to_header=args.header,
        metadata=header_info
    )
def flag_hwe(ds, pval_threshold, n_participants):
    '''
    Perform sex aware HWE tests on autosomes and chrX for hg38
    '''
    populations = ds.aggregate_cols(hl.agg.counter(ds.sa.POP)) # produces a dict of population
    if None in populations:
        print("WARNING: Skipping HWE test for samples of undefined population")
        del populations[None]
    hwe_tests = {}
    # Pre-queue the 3 hwe tests for each population
    hwe_tests['hweEUR'] = hl.cond(
        # Filter 1) hwe{Pop}
        # If variant is autosomal and there are enough samples in the population, apply HWE
        ds.locus.in_autosome() & (hl.agg.count_where(ds.sa.POP == "EUR") >= n_participants),
        hl.agg.filter((ds.sa.POP == "EUR") & (ds.qc.AF[1] > 0.001), hl.agg.hardy_weinberg_test(ds.GT)),
        # If variant is not autosomal, but in X nonpar and there are enough females in the population, apply HWE
        hl.cond(
            ds.locus.in_x_nonpar() & (hl.agg.count_where((ds.sa.POP == "EUR") & (ds.sa.GENDER == "Female")) >= n_participants),
            hl.agg.filter((ds.sa.POP == "EUR") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.001), hl.agg.hardy_weinberg_test(ds.GT)),
            hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
        )
    )
    hwe_tests['hweEURFemale'] = hl.cond(
        # Filter 2) hwe{Pop}Female
        # If this is an X par variant and there are enough females in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "EUR") & (ds.sa.GENDER == "Female")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "EUR") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.001), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweEURMale'] = hl.cond(
        # Filter 2) hwe{Pop}Male
        # If this is an X par variant and there are enough males in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "EUR") & (ds.sa.GENDER == "Male")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "EUR") & (ds.sa.GENDER == "Male") & (ds.qc.AF[1] > 0.001), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweAFR'] = hl.cond(
        # Filter 1) hwe{Pop}
        # If variant is autosomal and there are enough samples in the population, apply HWE
        ds.locus.in_autosome() & (hl.agg.count_where(ds.sa.POP == "AFR") >= n_participants),
        hl.agg.filter((ds.sa.POP == "AFR") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        # If variant is not autosomal, but in X nonpar and there are enough females in the population, apply HWE
        hl.cond(
            ds.locus.in_x_nonpar() & (hl.agg.count_where((ds.sa.POP == "AFR") & (ds.sa.GENDER == "Female")) >= n_participants),
            hl.agg.filter((ds.sa.POP == "AFR") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
            hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
        )
    )
    hwe_tests['hweAFRFemale'] = hl.cond(
        # Filter 2) hwe{Pop}Female
        # If this is an X par variant and there are enough females in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "AFR") & (ds.sa.GENDER == "Female")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "AFR") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweAFRMale'] = hl.cond(
        # Filter 2) hwe{Pop}Male
        # If this is an X par variant and there are enough males in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "AFR") & (ds.sa.GENDER == "Male")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "AFR") & (ds.sa.GENDER == "Male") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweASN'] = hl.cond(
        # Filter 1) hwe{Pop}
        # If variant is autosomal and there are enough samples in the population, apply HWE
        ds.locus.in_autosome() & (hl.agg.count_where(ds.sa.POP == "ASN") >= n_participants),
        hl.agg.filter((ds.sa.POP == "ASN") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        # If variant is not autosomal, but in X nonpar and there are enough females in the population, apply HWE
        hl.cond(
            ds.locus.in_x_nonpar() & (hl.agg.count_where((ds.sa.POP == "ASN") & (ds.sa.GENDER == "Female")) >= n_participants),
            hl.agg.filter((ds.sa.POP == "ASN") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
            hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
        )
    )
    hwe_tests['hweASNFemale'] = hl.cond(
        # Filter 2) hwe{Pop}Female
        # If this is an X par variant and there are enough females in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "ASN") & (ds.sa.GENDER == "Female")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "ASN") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweASNMale'] = hl.cond(
        # Filter 2) hwe{Pop}Male
        # If this is an X par variant and there are enough males in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "ASN") & (ds.sa.GENDER == "Male")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "ASN") & (ds.sa.GENDER == "Male") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweEAS'] = hl.cond(
        # Filter 1) hwe{Pop}
        # If variant is autosomal and there are enough samples in the population, apply HWE
        ds.locus.in_autosome() & (hl.agg.count_where(ds.sa.POP == "EAS") >= n_participants),
        hl.agg.filter((ds.sa.POP == "EAS") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        # If variant is not autosomal, but in X nonpar and there are enough females in the population, apply HWE
        hl.cond(
            ds.locus.in_x_nonpar() & (hl.agg.count_where((ds.sa.POP == "EAS") & (ds.sa.GENDER == "Female")) >= n_participants),
            hl.agg.filter((ds.sa.POP == "EAS") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
            hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
        )
    )
    hwe_tests['hweEASFemale'] = hl.cond(
        # Filter 2) hwe{Pop}Female
        # If this is an X par variant and there are enough females in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "EAS") & (ds.sa.GENDER == "Female")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "EAS") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweEASMale'] = hl.cond(
        # Filter 2) hwe{Pop}Male
        # If this is an X par variant and there are enough males in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "EAS") & (ds.sa.GENDER == "Male")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "EAS") & (ds.sa.GENDER == "Male") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweSAS'] = hl.cond(
        # Filter 1) hwe{Pop}
        # If variant is autosomal and there are enough samples in the population, apply HWE
        ds.locus.in_autosome() & (hl.agg.count_where(ds.sa.POP == "SAS") >= n_participants),
        hl.agg.filter((ds.sa.POP == "SAS") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        # If variant is not autosomal, but in X nonpar and there are enough females in the population, apply HWE
        hl.cond(
            ds.locus.in_x_nonpar() & (hl.agg.count_where((ds.sa.POP == "SAS") & (ds.sa.GENDER == "Female")) >= n_participants),
            hl.agg.filter((ds.sa.POP == "SAS") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
            hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
        )
    )
    hwe_tests['hweSASFemale'] = hl.cond(
        # Filter 2) hwe{Pop}Female
        # If this is an X par variant and there are enough females in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "SAS") & (ds.sa.GENDER == "Female")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "SAS") & (ds.sa.GENDER == "Female") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )
    hwe_tests['hweSASMale'] = hl.cond(
        # Filter 2) hwe{Pop}Male
        # If this is an X par variant and there are enough males in the population, apply HWE
        ds.locus.in_x_par() & (hl.agg.count_where((ds.sa.POP == "SAS") & (ds.sa.GENDER == "Male")) >= n_participants),
        hl.agg.filter((ds.sa.POP == "SAS") & (ds.sa.GENDER == "Male") & (ds.qc.AF[1] > 0.01), hl.agg.hardy_weinberg_test(ds.GT)),
        hl.struct(het_freq_hwe=-1, p_value=-1) # Fallback: Not enough samples for this population
    )

    # execute tests
    ds = ds.annotate_rows(
        hweByPop=hl.struct(
            **hwe_tests
        )
    )

    hwe_info = {}
    # Pre-queue the 3 info annotations for each population
    # This exports variant.hwe{}.p_value to an info field variant.info.pHWE{}
    hwe_info['pHWE_EUR'] = ds.hweByPop['hweEUR'].p_value
    hwe_info['pHWE_EURFemale'] = ds.hweByPop['hweEURFemale'].p_value
    hwe_info['pHWE_EURMale'] = ds.hweByPop['hweEURMale'].p_value
    hwe_info['pHWE_AFR'] = ds.hweByPop['hweAFR'].p_value
    hwe_info['pHWE_AFRFemale'] = ds.hweByPop['hweAFRFemale'].p_value
    hwe_info['pHWE_AFRMale'] = ds.hweByPop['hweAFRMale'].p_value
    hwe_info['pHWE_ASN'] = ds.hweByPop['hweASN'].p_value
    hwe_info['pHWE_ASNFemale'] = ds.hweByPop['hweASNFemale'].p_value
    hwe_info['pHWE_ASNMale'] = ds.hweByPop['hweASNMale'].p_value
    hwe_info['pHWE_EAS'] = ds.hweByPop['hweEAS'].p_value
    hwe_info['pHWE_EASFemale'] = ds.hweByPop['hweEASFemale'].p_value
    hwe_info['pHWE_EASMale'] = ds.hweByPop['hweEASMale'].p_value
    hwe_info['pHWE_SAS'] = ds.hweByPop['hweSAS'].p_value
    hwe_info['pHWE_SASFemale'] = ds.hweByPop['hweSASFemale'].p_value
    hwe_info['pHWE_SASMale'] = ds.hweByPop['hweSASMale'].p_value

    # execute annotations
    ds = ds.annotate_rows(
        info=ds.info.annotate(
            **hwe_info
        )
    )
    # Add filter flags
    # Now flag variants based on the failed populations
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_EUR'] != -1) & (ds.info['pHWE_EUR'] < pval_threshold),
            ds.filters.add('HWE_EUR'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_EURFemale'] != -1) & (ds.info['pHWE_EURFemale'] < pval_threshold),
            ds.filters.add('HWE_EURFemale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_EURMale'] != -1) & (ds.info['pHWE_EURMale'] < pval_threshold),
            ds.filters.add('HWE_EURMale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_AFR'] != -1) & (ds.info['pHWE_AFR'] < pval_threshold),
            ds.filters.add('HWE_AFR'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_AFRFemale'] != -1) & (ds.info['pHWE_AFRFemale'] < pval_threshold),
            ds.filters.add('HWE_AFRFemale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_AFRMale'] != -1) & (ds.info['pHWE_AFRMale'] < pval_threshold),
            ds.filters.add('HWE_AFRMale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_ASN'] != -1) & (ds.info['pHWE_ASN'] < pval_threshold),
            ds.filters.add('HWE_ASN'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_ASNFemale'] != -1) & (ds.info['pHWE_ASNFemale'] < pval_threshold),
            ds.filters.add('HWE_ASNFemale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_ASNMale'] != -1) & (ds.info['pHWE_ASNMale'] < pval_threshold),
            ds.filters.add('HWE_ASNMale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_EAS'] != -1) & (ds.info['pHWE_EAS'] < pval_threshold),
            ds.filters.add('HWE_EAS'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_EASFemale'] != -1) & (ds.info['pHWE_EASFemale'] < pval_threshold),
            ds.filters.add('HWE_EASFemale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_EASMale'] != -1) & (ds.info['pHWE_EASMale'] < pval_threshold),
            ds.filters.add('HWE_EASMale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_SAS'] != -1) & (ds.info['pHWE_SAS'] < pval_threshold),
            ds.filters.add('HWE_SAS'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_SASFemale'] != -1) & (ds.info['pHWE_SASFemale'] < pval_threshold),
            ds.filters.add('HWE_SASFemale'),
            ds.filters
        )
    )
    ds = ds.annotate_rows(
        filters=hl.cond(
            (ds.info['pHWE_SASMale'] != -1) & (ds.info['pHWE_SASMale'] < pval_threshold),
            ds.filters.add('HWE_SASMale'),
            ds.filters
        )
    )
    return ds
def flag_batch_association(ds, batch, pval_threshold):
    '''
    Perform Batch Association analysis, and flag variants with significant batch
    '''
    if not hasattr(ds.row, 'BATCH'):
        ds = ds.annotate_rows(BATCH=hl.struct())
    batches = ds.aggregate_cols(hl.agg.counter(ds.sa[batch])) # produces a dict of batch: count
    print("Batches for", batch, ":", batches)
    if None in batches:
        del batches[None]
    if not len(batches):
        return ds
    ds = ds.annotate_rows(
        # Annotate variant.BATCH.{batch} = fisher test results
        BATCH=ds.BATCH.annotate(
            **{
                batch: hl.struct(
                    **{
                        i: hl.fisher_exact_test(
                            # Prepare fisher inputs
                            hl.int32(hl.agg.filter(ds.sa[batch] == i, hl.agg.sum(ds.GT.n_alt_alleles()))),
                            hl.int32(hl.agg.filter(ds.sa[batch] != i, hl.agg.sum(ds.GT.n_alt_alleles()))),
                            hl.int32(hl.agg.filter(ds.sa[batch] == i, hl.agg.sum(2 - ds.GT.n_alt_alleles()))),
                            hl.int32(hl.agg.filter(ds.sa[batch] != i, hl.agg.sum(2 - ds.GT.n_alt_alleles())))
                        )
                        for i in batches # repeat for each group in the batch
                    }
                )
            }
        )
    )
    ds = ds.annotate_rows(
        # Export batch association pvalues as an info field
        # p{batch} = minimum pvalue from any group in this batch
        info=ds.info.annotate(
            **{
                'p{}'.format(batch): hl.min(
                    [
                        ds.BATCH[batch][i].p_value
                        for i in batches
                    ]
                )
            }
        )
    )
    return ds.annotate_rows(
        # Flag failed variants based on pvalue
        filters=hl.cond(
            (ds.qc.AF[0] < .99) & (ds.qc.AF[0] > .01) & (ds.info['p{}'.format(batch)] < pval_threshold),
            ds.filters.add(batch),
            ds.filters
        )
    )