Annotating a non-split VDS using a split VDS


#1

I wrote these python functions to annotate a VDS that wasn’t split with a VDS that was split, accounting for minrep of alleles, etc. Since I feel that this might be of interest to others, here it is:

An example of its usage would be

annotate_non_split_from_split(hc, non_split_vds_path='gs://mybucket/my.vds',
                                      split_vds=hc.read("gs://mybucket/split_vds"),
                                      annotations=['va.annotation1','va.foo.annotation2'])

Where:

  • hc is the HailContext
  • the annotations are annotations in the split VDS that will be annotated in the non-split VDS.
    This will return a new VDS with va.annotation1 and va.foo.annotation2 containing the values from the split VDS for each of the alleles at the site in their corresponding order. The type of these will be an Array of the type of each annotation in the split VDS and of length equal to the number of ALT alleles. If an allele is missing from the split VDS, the corresponding entries in the Array contain NA. If an entire site is missing from the split VDS, then the annotation is missing (NA).
def getAnnType(annotation, schema):
    ann_path = annotation.split(".")[1:]
    ann_type = schema
    for p in ann_path:
        ann_type = [x for x in ann_type.fields if x.name == p][0].typ
    return ann_type

def annotate_non_split_from_split(hc, non_split_vds_path, split_vds, annotations):

    ann_types = list(map(lambda x: str(getAnnType(x,split_vds.variant_schema)), annotations))

    variant_annotated_vds = (
        hc.read(non_split_vds_path, sites_only=True)
        .annotate_variants_expr('va.variant = v')
        .split_multi()
    )

    ann_agg_codes = ["`%s` = index(va.map(x => {val: %s, aIndex: va.aIndex}).collect(), aIndex)" % (a, a) for a in annotations]
    agg = (
        split_vds
            .annotate_variants_vds(variant_annotated_vds, 'va.variant = vds.variant, va.aIndex = vds.aIndex')
            .filter_variants_expr('isDefined(va.variant)')
            .variants_keytable()
            .aggregate_by_key(key_condition='variant = va.variant', agg_condition=",".join(ann_agg_codes))

     )

    ann_codes = ['%s = let x = table.`%s` in' \
                 ' range(table.variant.nAltAlleles).map(i => if(x.contains(i+1)) x[i+1].val else NA: %s)' % (a, a, b)
                 for (a, b) in zip(annotations, ann_types)]

    return (
        hc.read(non_split_vds_path)
            .annotate_variants_keytable(agg, ",".join(ann_codes))
    )

#2

Thanks @lfrancioli for those functions, they have really come in handy for me. I tweaked annotate_non_split_from_split a bit to work with the v0.1 API, and have included the result below in case it’s useful for someone else:

def annotate_non_split_from_split(hc, non_split_vds_path, split_vds, annotations):
    ann_types = list(map(lambda x: str(getAnnType(x, split_vds.variant_schema)), annotations))

    variant_annotated_vds = (hc
        .read(non_split_vds_path, drop_samples=True)
        .annotate_variants_expr('va.variant = v, va.nAltAlleles = v.nAltAlleles()')
        .split_multi()
    )

    ann_agg_codes = ["`%s` = index(va.map(x => {val: %s, aIndex: va.aIndex}).collect(), aIndex)" % (a, a) for a in annotations]

    agg = (split_vds
        .annotate_variants_vds(variant_annotated_vds, 'va.variant = vds.variant, va.aIndex = vds.aIndex, va.nAltAlleles = vds.nAltAlleles')
        .filter_variants_expr('isDefined(va.variant)')
        .variants_table().repartition(10000)
        .aggregate_by_key('variant = va.variant', ann_agg_codes + ['nAltAlleles = va.map(x => x.nAltAlleles).take(1)[0]'])
    )

    ann_codes = ['%s = let x = table.`%s` in' \
                 ' range(table.nAltAlleles).map(i => if(x.contains(i+1)) x[i+1].val else NA: %s)' % (a, a, b)
                 for (a, b) in zip(annotations, ann_types)]

    return (hc
        .read(non_split_vds_path)
        .annotate_variants_table(agg, expr=",".join(ann_codes))
    )

The table.repartition(10000) step for me seemed to help resolve lost executor failures for my large dataset (~ 74 million variants, complex annotations).


#3

Ah, I can see how the repartition would help there. Our aggregate_by_key function should support a partitions argument, which would make the repartition part “free”. Thanks for updating this for 0.1!