Gene-based sample statistics

Hi Hail,

This feels like a relatively simple aggregation/counter problem, yet I am struggling to find the right solution. I have matrix table with rows annotated with gene names and variant consequence and columns annotated with standard hl.sample_qc() output. I would like to transform this into a table with rows representing genes and columns representing samples and each entry in the table being the number of het genotypes. Basically, I need to count how many hets each sample has in each gene.
So far, the only way I came up with is to loop through genes, e.g.:

mt_gene = mt.filter_rows(mt.varConsequence.SEVERE_GENE_NAME = 'PCSK9')
hets_per_gene = hl.sample_qc(mt_gene).n_het.collect()

but this seems like very inefficient solution. Could you please suggest an optimal way to do this in hail?

Thanks!
Nikita

This should involve group_rows_by / aggregate:

mt = mt.filter(mt.varConsequence.is_severe) # filter to just high-csq if you want
mt_gene = mt.group_rows_by(gene = mt.varConsequence.SEVERE_GENE_NAME) \
    .aggregate(n_het = hl.agg.count_where(mt.GT.is_het()))

Thanks for the prompt response, Tim!
Unfortunately this command keeps failing on google dataproc (with pre-emptible workers). See the traceback hidden below (briefly: FatalError: SparkException: Job aborted due to stage failure: ShuffleMapStage 51 (mapPartitions at ContextRDD.scala:217) has failed the maximum allowable number of times: 4).
The matrix table in this case has ~1 mln variants and 14k samples.

Summary

FatalError Traceback (most recent call last)
/opt/conda/default/lib/python3.6/site-packages/IPython/core/formatters.py in call(self, obj)
700 type_pprinters=self.type_printers,
701 deferred_pprinters=self.deferred_printers)
–> 702 printer.pretty(obj)
703 printer.flush()
704 return stream.getvalue()

/opt/conda/default/lib/python3.6/site-packages/IPython/lib/pretty.py in pretty(self, obj)
397 if cls is not object
398 and callable(cls.dict.get(‘repr’)):
–> 399 return _repr_pprint(obj, self, cycle)
400
401 return _default_pprint(obj, self, cycle)

/opt/conda/default/lib/python3.6/site-packages/IPython/lib/pretty.py in repr_pprint(obj, p, cycle)
687 “”“A pprint that just redirects to the normal repr function.”""
688 # Find newlines and replace them with p.break
()
–> 689 output = repr(obj)
690 for idx,output_line in enumerate(output.splitlines()):
691 if idx:

/opt/conda/default/lib/python3.6/site-packages/hail/matrixtable.py in repr(self)
2521
2522 def repr(self):
-> 2523 return self.str()
2524
2525 def repr_html(self):

/opt/conda/default/lib/python3.6/site-packages/hail/matrixtable.py in str(self)
2515
2516 def str(self):
-> 2517 s = self.table_show.str()
2518 if self.displayed_n_cols != self.actual_n_cols:
2519 s += f"showing the first { self.displayed_n_cols } of { self.actual_n_cols } columns"

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in str(self)
1239
1240 def str(self):
-> 1241 return self._ascii_str()
1242
1243 def repr(self):

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in _ascii_str(self)
1266 return s
1267
-> 1268 rows, has_more, dtype = self.data()
1269 fields = list(dtype)
1270 trunc_fields = [trunc(f) for f in fields]

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in data(self)
1249 row_dtype = t.row.dtype
1250 t = t.select(**{k: hl._showstr(v, self.truncate) for (k, v) in t.row.items()})
-> 1251 rows, has_more = t._take_n(self.n)
1252 self._data = (rows, has_more, row_dtype)
1253 return self._data

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in _take_n(self, n)
1373 has_more = False
1374 else:
-> 1375 rows = self.take(n + 1)
1376 has_more = len(rows) > n
1377 rows = rows[:n]

</opt/conda/default/lib/python3.6/site-packages/decorator.py:decorator-gen-1065> in take(self, n, _localize)

/opt/conda/default/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
583 def wrapper(original_func, *args, **kwargs):
584 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
–> 585 return original_func(*args, **kwargs)
586
587 return wrapper

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in take(self, n, _localize)
2037 “”"
2038
-> 2039 return self.head(n).collect(_localize)
2040
2041 @typecheck_method(n=int)

</opt/conda/default/lib/python3.6/site-packages/decorator.py:decorator-gen-1059> in collect(self, _localize)

/opt/conda/default/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
583 def wrapper(original_func, *args, **kwargs):
584 args
, kwargs
= check_all(__original_func, args, kwargs, checkers, is_method=is_method)
–> 585 return original_func(*args, **kwargs)
586
587 return wrapper

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in collect(self, _localize)
1836 e = construct_expr(ir, hl.tarray(t.row.dtype))
1837 if _localize:
-> 1838 return Env.backend().execute(e._ir)
1839 else:
1840 return e

/opt/conda/default/lib/python3.6/site-packages/hail/backend/backend.py in execute(self, ir, timed)
107
108 def execute(self, ir, timed=False):
–> 109 result = json.loads(Env.hc()._jhc.backend().executeJSON(self._to_java_ir(ir)))
110 value = ir.typ._from_json(result[‘value’])
111 timings = result[‘timings’]

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in call(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:

/opt/conda/default/lib/python3.6/site-packages/hail/utils/java.py in deco(*args, **kwargs)
223 raise FatalError(’%s\n\nJava stack trace:\n%s\n’
224 ‘Hail version: %s\n’
–> 225 ‘Error summary: %s’ % (deepest, full, hail.version, deepest)) from None
226 except pyspark.sql.utils.CapturedException as e:
227 raise FatalError(’%s\n\nJava stack trace:\n%s\n’

FatalError: SparkException: Job aborted due to stage failure: ResultStage 46 (collect at ContextRDD.scala:223) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.FetchFailedException: Failure while fetching StreamChunkId{streamId=1674024014000, chunkIndex=0}: java.lang.RuntimeException: Executor is not registered (appId=application_1579888659531_0002, execId=1198) at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getBlockData(ExternalShuffleBlockResolver.java:177) at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler$ManagedBufferIterator.next(ExternalShuffleBlockHandler.java:239)
etc…

Ah, yeah, you’ll need to run this on non-preemptibles. Big shuffles like this go really badly when nodes are gradually preempted.

Worked just fine! thanks!
one more quick question:
resulting table is a matrix table type. Is there any way I can get exactly this table but in a data-frame like type, to be saved as a single tab-separated file (i.e. importable to R).
Seem like .make_table() converts this to hail table but I am not sure where to take it from here.

does mt.n_het.export(file...) give you something formatted properly?

yes, this did solve the issue. Thanks!
I will just leave the link below to the relevant thread for future readers.