LASSO / elastic net regression on blocks of variants

Hey @tshmak!

Let me address your comment point-wise:

Regarding Option One

We really want to avoid Option 1 because the performance of it will never meet our standards. In particular, consider that you have to write out all this data onto a disk and then read it back in to whatever program you want to execute. Moreover, this disappointing performance requires some non-trivial engineering effort.

Windowed Operations

Sliding windows are definitely on our roadmap. They’re more of a medium-term thing because they require some non-trivial engineering work.

Your Particular Problem

I think your best choice is to use Hail to group your data by LD Block and then use PySpark to execute the LASSO or elastic net regressions in parallel on the cluster.

import hail as hl
mt = hl.read_matrix_table(...)
gts_as_rows = mt.annotate_rows(
  genotypes = hl.agg.collect(mt.GT)
).rows()

groups = gts_as_rows.group_by(
  ld_block = ...
).aggregate(
  genotypes = hl.agg.collect(gts_as_rows.genotypes)
, ys = hl.agg.collect(gts_as_rows.value_to_predict)
)

this gets you to a table with a genotypes field that is an array of arrays, the inner arrays are the genotype rows.

df = groups.to_spark()

from pyspark.sql.functions import udf

def get_intercept(X, y):
    from sklearn import linear_model
    clf = linear_model.Lasso(alpha=0.1)
    clf.fit(X, y)
    return float(clf.intercept_)

get_intercept_udf = udf(get_intercept)
df.select(get_intercept_udf("genotypes", "ys").alias("intercept")).show()

This second block actually fits your model.


And here’s a full working example with a randomly generated dataset:

import hail as hl
mt = hl.balding_nichols_model(3, 100, 100)
gts_as_rows = mt.annotate_rows(
  mean = hl.agg.mean(hl.float(mt.GT.n_alt_alleles()))
, genotypes = hl.agg.collect(hl.float(mt.GT.n_alt_alleles()))
).rows()
groups = gts_as_rows.group_by(
  ld_block = gts_as_rows.locus.position // 10
).aggregate(
  genotypes = hl.agg.collect(gts_as_rows.genotypes)
, ys = hl.agg.collect(gts_as_rows.mean)
)

df = groups.to_spark()

from pyspark.sql.functions import udf

def get_intercept(X, y):
    from sklearn import linear_model
    clf = linear_model.Lasso(alpha=0.1)
    clf.fit(X, y)
    return float(clf.intercept_)

get_intercept_udf = udf(get_intercept)
df.select(get_intercept_udf("genotypes", "ys").alias("intercept")).show()
1 Like