Hopefully, this an easy one but I’m trying to work through a process like this:
- Group a MatrixTable (MT) by a few mutually exclusive sets of samples (i.e. populations)
- In other words, if I have 3000 samples then there might be 3 populations, each of size ~500 (it’s not an exhaustive partitioning and none of the samples in one group are in another)
- For each of those populations, run
variant_qc
and determine which variants pass some filters - Filter the original MT to only variants that pass the filters in all populations
My mental model for this looks like:
populations = ['group1', 'group2', 'group3']
def run_qc(mt):
masks = []
for p in populations:
# Subset out my population (i.e. run a column-only filter)
mtp = hl.variant_qc(mt.filter_cols(mt.population == p))
# Add a BooleanExpression of length equal to num variants
masks.append(mtp.variant_qc.p_value_hwe > 10e-6)
# Combine the boolean masks with same binary operator (AND in this case)
# * This will fail though since the boolean expressions are derived from different tables *
mask = functools.reduce(lambda m1, m2: m1 & m2, masks)
# Use the combined mask to filter the rows in the original mt
My first take at converting this logic to something Hail-compatible, that adds the masks as annotations instead, looked like this:
def run_qc(mt):
masks = {}
for i, p in enumerate(populations):
mtp = hl.variant_qc(mt.filter_cols(mt.population == p))
# Attach the boolean expression as a new column on the original MT
mt = mt.annotate_rows(**{'mask' + str(i):
mtp.rows()[mt.row_key].variant_qc.p_value_hwe > 10e-6
})
# Filter the original MT using the conjunction of the masks
masks = [mt['mask' + str(i)] for i in range(len(populations))]
return mt.filter_rows(functools.reduce(lambda m1, m2: m1 & m2, masks))
That works and seems to do what I want. What I’ve learned from looking at the execution plan though is that for each additional population, two new single-stage jobs are added and the execution times blow up quickly even on a small 150k variant by 1350 sample dataset (each new population seems to correspond to one more RDD to run pairwise through zipPartitions
in a big RVD fold as the final stage).
I’d like to get the same kind of behavior as I see with this code where even though it’s calculating something for ALL samples, it still executes very quickly regardless of the number of arbitrary mask computations I add and it does so in a single stage – which seems reasonable if all the call matrix data for a single variant is in an in-memory array spanning the samples:
def run_qc(mt):
for i, p in enumerate(populations):
# Add small increment to each threshold to make sure this
# doesn't degenerate to a single calculation somehow
mt = mt.annotate_rows(**{'mask' + str(i): hl.agg.fraction(hl.is_defined(mt.GT)) > .9 + .001 * i})
masks = [mt['mask' + str(i)] for i in range(len(populations))]
return mt.filter_rows(functools.reduce(lambda m1, m2: m1 & m2, masks))
That last example runs in 3 seconds for 8 populations while the one above it takes 90 seconds, even though it seems like it should be doing a lot more work. It isn’t actually grouping by population like I want, but I can’t see how to do that with group_cols_by
+ aggregate_cols
. Is there a more idiomatic approach for this?
Thank you!