Issues with sample and variant QC by group

Hopefully, this an easy one but I’m trying to work through a process like this:

  1. Group a MatrixTable (MT) by a few mutually exclusive sets of samples (i.e. populations)
    • In other words, if I have 3000 samples then there might be 3 populations, each of size ~500 (it’s not an exhaustive partitioning and none of the samples in one group are in another)
  2. For each of those populations, run variant_qc and determine which variants pass some filters
  3. Filter the original MT to only variants that pass the filters in all populations

My mental model for this looks like:

populations = ['group1', 'group2', 'group3']

def run_qc(mt):
    masks = []
    for p in populations:
        # Subset out my population (i.e. run a column-only filter)
        mtp = hl.variant_qc(mt.filter_cols(mt.population == p))
        # Add a BooleanExpression of length equal to num variants
        masks.append(mtp.variant_qc.p_value_hwe > 10e-6)

    # Combine the boolean masks with same binary operator (AND in this case)
    # * This will fail though since the boolean expressions are derived from different tables *
    mask = functools.reduce(lambda m1, m2: m1 & m2, masks)
    # Use the combined mask to filter the rows in the original mt

My first take at converting this logic to something Hail-compatible, that adds the masks as annotations instead, looked like this:

def run_qc(mt):
    masks = {}
    for i, p in enumerate(populations):
        mtp = hl.variant_qc(mt.filter_cols(mt.population == p))
        # Attach the boolean expression as a new column on the original MT
        mt = mt.annotate_rows(**{'mask' + str(i): 
            mtp.rows()[mt.row_key].variant_qc.p_value_hwe > 10e-6
        })
    # Filter the original MT using the conjunction of the masks
    masks = [mt['mask' + str(i)] for i in range(len(populations))]
    return mt.filter_rows(functools.reduce(lambda m1, m2: m1 & m2, masks))

That works and seems to do what I want. What I’ve learned from looking at the execution plan though is that for each additional population, two new single-stage jobs are added and the execution times blow up quickly even on a small 150k variant by 1350 sample dataset (each new population seems to correspond to one more RDD to run pairwise through zipPartitions in a big RVD fold as the final stage).

I’d like to get the same kind of behavior as I see with this code where even though it’s calculating something for ALL samples, it still executes very quickly regardless of the number of arbitrary mask computations I add and it does so in a single stage – which seems reasonable if all the call matrix data for a single variant is in an in-memory array spanning the samples:

def run_qc(mt):
    for i, p in enumerate(populations):
        # Add small increment to each threshold to make sure this 
        # doesn't degenerate to a single calculation somehow
        mt = mt.annotate_rows(**{'mask' + str(i): hl.agg.fraction(hl.is_defined(mt.GT)) > .9 + .001 * i})
    masks =  [mt['mask' + str(i)] for i in range(len(populations))]
    return mt.filter_rows(functools.reduce(lambda m1, m2: m1 & m2, masks))

That last example runs in 3 seconds for 8 populations while the one above it takes 90 seconds, even though it seems like it should be doing a lot more work. It isn’t actually grouping by population like I want, but I can’t see how to do that with group_cols_by + aggregate_cols. Is there a more idiomatic approach for this?

Thank you!

1 Like

This bit:

        mtp = hl.variant_qc(mt.filter_cols(mt.population == p))
        # Attach the boolean expression as a new column on the original MT
        mt = mt.annotate_rows(**{'mask' + str(i): 
            mtp.rows()[mt.row_key].variant_qc.p_value_hwe > 10e-6
        })

is forking the Hail computation graph, meaning that to compute the result, the parent must be computed twice. We had something in variant_qc and sample_qc that was doing this same type of thing when you ran your benchmarks first, and it certainly leads to exponential growth in runtime.

The solution here is to use a single annotate_rows that computes the hwe p value yourself. Here we can do it in one pass using agg.group_by:

mt = mt.annotate_rows(
    hwe_pops = hl.agg.group_by(mt.population, 
                               hl.agg.hardy_weinberg_test(mt.GT)))

We could also expose something that’s a variant_qc aggregator (it’s implemented in terms of other aggregators). For technical reasons we couldn’t have this compute call rate, though, I think.

You can do call rate by aggregating over columns counting the occurrences of populations, and then using that as the denominator of the call rate computation.

Thank you as usual @tpoterba! I was thinking about it more and had two higher level follow up questions:

Axis Precedence

What are the rules for determining the axis over which some aggregation applies?

If I had an expression like this:

hl.agg.group_by(
    # Group by something in rows and columns
    hl.struct(c=mt.locus.contig, p=mt.pop), 
    # It isn't clear yet whether this sum will be over all elements of a 2D matrix or across one axis
    hl.agg.sum(mt.GT.n_alt_alleles())
)
<DictExpression of type dict<struct{p: int32, c: str}, int64>>

in what other context does this expression become usable? Are Table.annotate and MatrixTable.annotate_{rows,cols} the only functions for which the aggregation axis becomes unambiguous? I see notes like “Aggregation index may be promoted based on context” in the expression description but I’m curious what all could constitute the context for promotion (and what promotion means).

Aggregation Methods

I’m also trying to wrap my head around all the possible ways to do some kind of aggregation over entries in an MT, and it seems like these are all plausible within the API:

  • Look for existing higher-order functions in hail.methods that already wrap up the aggregation logic you want (like variant_qc)
  • MatrixTable.aggregate_{rows,cols} with hl.agg.group_by
    • Example:
      mt.aggregate_rows(hl.agg.group_by(
         mt.pop, hl.agg.sum(mt.GT.n_alt_alleles())
      ))
      
    • I can see why this doesn’t make sense to actually use, but it still seems to be valid given that the signature for MatrixTable.aggregate_* methods only require an Expression of some kind
    • In this case that code throws a cryptic error: KeyError: 'g'
  • MatrixTable.group_{rows,cols}_by followed with an expression passed to GroupedMatrixTable.aggregate
  • MatrixTable.annotate_{rows,cols} with hl.agg.group_by and an appropriate expression per group
  • MatrixTable.entries().{group_by, aggregate, annotate}
    • This also seems to be a reasonable way to get row or column wise groupings followed by aggregations over the entry data
  • MatrixTable.{rows, cols} followed by group by functions that map back to the original entry data in the MT
    • Example:
      mtr = mt.rows()
      mtr.aggregate(hl.agg.group_by(
         mtr.locus.contig, 
         hl.agg.sum(mt.entries().key_by('locus', 'alleles').index(mtr.key).GT.n_alt_alleles())
      ))
      
    • I was a little surprised that this works but it does seem to be another possibility
  • Use hl.agg.group_by from the start to create some DictExpression – maybe there is some way to use expressions like this outside of the context of a table?
  • Convert some expression over the MT entries to float64, convert that to BlockMatrix and then use BlockMatrix.{filter_rows,filter_cols} or slicing in addition to math functions (sum, sqrt, etc.) to get grouped calculation results
  • Use MatrixTable.{filter_rows,filter_cols} with calculations applied to each subset
    • This seems like it would be a clear anti-pattern, but I thought it wasn’t entirely impossible for the IR to compile down to backend instructions that execute the same way as a more explicit grouping function

Are there some general guidelines that a user should keep in mind that might simplify how to think about all the different ways to do this?

These are good questions, we should take some of this discussion and push it into the docs in an “aggregation overview” kind of place.

An expression has axes that it is defined over. mt.locus.contig has the row axis, mt.pop has column axis, and hl.struct(c=mt.locus.contig, p=mt.pop) has both the row and column axis (as does mt.GT, in general).

This expression could appear inside an aggregation in a function that aggregates over entries. The aggregation scope is set from the outside – if you have:

n = hl.agg.count()

this has different meanings depending on where it appears:

mt.aggregate_rows(n) -> number of rows
mt.aggregate_cols(n) -> number of cols
mt.aggregate_entries(n) -> number of non-filtered_entries
mt.annotate_rows(n=n) -> number of non-filtered_entries per row
etc

The business about ‘promotion’ means that you need to always be thinking about the aggregation context as the source of what’s being aggregated over. We’ve had people have an expression like:

n = hl.agg.count_where(mt.variant_is_loss_of_function)

and it can be confusing when they get n_samples * n_lof when they use:

mt.aggregate_entries(n)

Even though the expressions in the aggregation have only row axis (not row, col as aggregate_entries supports), we still aggregate over each entry (where the entries for each row will contribute the same value).

You’re spot on for most of this. I wouldn’t think of hl.agg.group_by as a separate “kind of aggregation” though, it’s just a compound aggregator that operates over the same aggregation context defined by the aggregation function. You’re missing mt.aggregate_entries which is equivalent to mt.entries().aggregate.

This is a bad error message bug, we probably have something wrong in how the axes of group_by are tracked.

Don’t do this! it’s both forking the graph (as in your top post in this thread) and it’s not doing what you expect anyhow. Table.index is a left-distinct join by default (you can collect all matching rows with all_matches=True) so even though this code will run, it’s going to choose an arbitrary GT value from each row to aggregate over (you’re only joining by the locus/alleles key, not the sample key).

We don’t have a great onboarding experience for people thinking at this kind of level about the library, so do expect some pain like this in other areas as well! It’s quite a flexible library, which means it’s not easy to document the “one way” to do something.

I just want to toss in here that you basically never want .entries() because of the memory overhead involved in the coordinate table representation of a matrix table. That said, the Hail compiler, valiantly, attempts to eliminate the realization of said representation.