LASSO / elastic net regression on blocks of variants

from @tshmak:

By “block”, I was referring to LD blocks, so non-overlapping regions in the genome. Experience told us that running a lasso or a elastic net using the entire dataset is not only slow but also not as good as breaking it down into LD blocks. The size of the blocks is usually within a few thousand SNPs so the whole thing can still be handled by pandas. I got so far as to use make_table and to_spark to export a subset of the data to spark, but am not sure how I can go about parallelizing the blocks across the genome. Say we have 1000 LD blocks and 2500 partitions. Is it possible to get the executors to process blocks instead of SNPs? Thanks a lot!

I have a few ideas:

  1. We could expose something that writes out a VCF per partition and runs a user-specified tool on the VCF, collecting results. This isn’t my favorite option, and it seems hacky.
  2. We implement LASSO / elastic net methods on MatrixTable. Then, we implement a GroupedMatrixTable that can do any MT operators on row / col groups. This makes the entire thing basically two lines of code, but we’re a long way from being able to do this.
  3. We add ndarrays as a Hail type, and implement regularized regression methods on ndarrays. We can then easily add some operation which would convert your MatrixTable to a Table with a gene field and a data field (ndarray), where you can do the regression trivially per record.

I think number 3 is feasible on the ~6 month time scale.

Breeze implements Lasso using this coordinate-descent approach:
ftp://ftp.math.ucla.edu/pub/camreport/cam09-17.pdf

Hi,
I’m actually more keen on option 1 because it seems the most general. You guys can implement options 2 and/or 3 if there is need later.

One of the problems with options 2 and 3 is that it’s currently not clear what the best way to run lasso/elastic net on this kind of data. Splitting it by LD block is my approach, but the disadvantage is that it’s unclear how best to define these blocks. Others may prefer a window approach, in which case your groups will be overlapping, so it’s best to take into account these various possibilities before embarking on 2 or 3.

Can 1 be implemented quickly? Say in a month or so?

BTW I prefer plink format to vcf.

[modified to reflect that you should definitely try @danking solution below first!] One approach would be to use Hail to organize/QC the genetic data, filter (prune?) to those variants you want overall, define your LD blocks, and then export the genetic data for each LD block in a format that can be read into numpy/pandas. You can then run a Python script for LASSO for each block as a batch of jobs on your local cluster or the cloud (this would be outside of Hail, which does not currently run Python on the workers).

Version 1 of the parallel export step is in this PR, which should hopefully merge next week (though you’re welcome to try the branch now):
https://github.com/hail-is/hail/pull/3539

You’d write your MatrixTable as a BlockMatrix using write_from_entry_expr with force_row_major=True and then export a TSV for each region of interest in parallel using export_rectangles (see function in PR). These regions can even overlap. If you have missing genotypes, you’ll want to annotate rows with means first and then use an expression like hl.or_else(mt.GT.n_alt_alleles(), mt.mean) to do mean imputation when converting to BlockMatrix. This PR doesn’t do compression automatically, but you can gzip the output TSVs (which should compress very well if nearly all values are 0.0, 1.0, and 2.0), and those can be read into numpy/pandas.

There’s some wastefulness going to BlockMatrix here, since you aren’t actually taking advantage of the block partitioning to do linear algebra, just exporting with respect to the original row partitioning (the motivating application for export_rectangles was exporting blocks of the banded LD matrix, computed as X X^T). We’ll consider exposing more control of parallel TSV export more directly on matrix table.

Hey @tshmak!

Let me address your comment point-wise:

Regarding Option One

We really want to avoid Option 1 because the performance of it will never meet our standards. In particular, consider that you have to write out all this data onto a disk and then read it back in to whatever program you want to execute. Moreover, this disappointing performance requires some non-trivial engineering effort.

Windowed Operations

Sliding windows are definitely on our roadmap. They’re more of a medium-term thing because they require some non-trivial engineering work.

Your Particular Problem

I think your best choice is to use Hail to group your data by LD Block and then use PySpark to execute the LASSO or elastic net regressions in parallel on the cluster.

import hail as hl
mt = hl.read_matrix_table(...)
gts_as_rows = mt.annotate_rows(
  genotypes = hl.agg.collect(mt.GT)
).rows()

groups = gts_as_rows.group_by(
  ld_block = ...
).aggregate(
  genotypes = hl.agg.collect(gts_as_rows.genotypes)
, ys = hl.agg.collect(gts_as_rows.value_to_predict)
)

this gets you to a table with a genotypes field that is an array of arrays, the inner arrays are the genotype rows.

df = groups.to_spark()

from pyspark.sql.functions import udf

def get_intercept(X, y):
    from sklearn import linear_model
    clf = linear_model.Lasso(alpha=0.1)
    clf.fit(X, y)
    return float(clf.intercept_)

get_intercept_udf = udf(get_intercept)
df.select(get_intercept_udf("genotypes", "ys").alias("intercept")).show()

This second block actually fits your model.


And here’s a full working example with a randomly generated dataset:

import hail as hl
mt = hl.balding_nichols_model(3, 100, 100)
gts_as_rows = mt.annotate_rows(
  mean = hl.agg.mean(hl.float(mt.GT.n_alt_alleles()))
, genotypes = hl.agg.collect(hl.float(mt.GT.n_alt_alleles()))
).rows()
groups = gts_as_rows.group_by(
  ld_block = gts_as_rows.locus.position // 10
).aggregate(
  genotypes = hl.agg.collect(gts_as_rows.genotypes)
, ys = hl.agg.collect(gts_as_rows.mean)
)

df = groups.to_spark()

from pyspark.sql.functions import udf

def get_intercept(X, y):
    from sklearn import linear_model
    clf = linear_model.Lasso(alpha=0.1)
    clf.fit(X, y)
    return float(clf.intercept_)

get_intercept_udf = udf(get_intercept)
df.select(get_intercept_udf("genotypes", "ys").alias("intercept")).show()
1 Like

@danking @jbloom Thanks so much for your answers!!