I wrote these python functions to annotate a VDS that wasn’t split with a VDS that was split, accounting for minrep of alleles, etc. Since I feel that this might be of interest to others, here it is:
An example of its usage would be
annotate_non_split_from_split(hc, non_split_vds_path='gs://mybucket/my.vds',
split_vds=hc.read("gs://mybucket/split_vds"),
annotations=['va.annotation1','va.foo.annotation2'])
Where:
- hc is the HailContext
- the annotations are annotations in the split VDS that will be annotated in the non-split VDS.
This will return a new VDS withva.annotation1
andva.foo.annotation2
containing the values from the split VDS for each of the alleles at the site in their corresponding order. The type of these will be anArray
of the type of each annotation in the split VDS and of length equal to the number of ALT alleles. If an allele is missing from the split VDS, the corresponding entries in the Array containNA
. If an entire site is missing from the split VDS, then the annotation is missing (NA
).
def getAnnType(annotation, schema):
ann_path = annotation.split(".")[1:]
ann_type = schema
for p in ann_path:
ann_type = [x for x in ann_type.fields if x.name == p][0].typ
return ann_type
def annotate_non_split_from_split(hc, non_split_vds_path, split_vds, annotations):
ann_types = list(map(lambda x: str(getAnnType(x,split_vds.variant_schema)), annotations))
variant_annotated_vds = (
hc.read(non_split_vds_path, sites_only=True)
.annotate_variants_expr('va.variant = v')
.split_multi()
)
ann_agg_codes = ["`%s` = index(va.map(x => {val: %s, aIndex: va.aIndex}).collect(), aIndex)" % (a, a) for a in annotations]
agg = (
split_vds
.annotate_variants_vds(variant_annotated_vds, 'va.variant = vds.variant, va.aIndex = vds.aIndex')
.filter_variants_expr('isDefined(va.variant)')
.variants_keytable()
.aggregate_by_key(key_condition='variant = va.variant', agg_condition=",".join(ann_agg_codes))
)
ann_codes = ['%s = let x = table.`%s` in' \
' range(table.variant.nAltAlleles).map(i => if(x.contains(i+1)) x[i+1].val else NA: %s)' % (a, a, b)
for (a, b) in zip(annotations, ann_types)]
return (
hc.read(non_split_vds_path)
.annotate_variants_keytable(agg, ",".join(ann_codes))
)