FatalError: StackOverflowError: null when combining single sample vcfs

Hello,
I have many (>10,000) single sample array vcf files that I would like to process through hail. My initial approach was to combine them into a single matrix table, however according to the documentation this feature is not yet supported. I opted to still attempt to combine them using union_cols. I can get the following lines to execute for a small number of samples around 10. But as soon as I scaled up to 100 I get the error shown below. I left out a majority of the traceback because it is a lot of lines there, but I can provide it if necessary. If there is an alternative approach to combining single sample vcfs using hail I would be open to that idea as well.

for file in filenames[:100]:
    patient_id = file.split('/')[-1].split('.')[0]
    patient_mt = hl.methods.import_vcf(file)
    patient_mt_filtered = patient_mt.filter_rows(patient_mt.locus == vkorc1_locus, keep=True)
    patient_mt_filtered = patient_mt_filtered.key_cols_by(s = patient_id)
    try:
        all_array_pts = all_array_pts.union_cols(patient_mt_filtered)
    except NameError:
        all_array_pts = patient_mt_filtered

# Concordance calculation
summary, samples, variants = hl.concordance(all_array_pts, vkorc1_mt)
left_homref_right_homvar = summary[2][4]
left_het_right_missing = summary[3][1]
left_het_right_something_else = sum(summary[3][:]) - summary[3][3]
total_concordant = summary[2][2] + summary[3][3] + summary[4][4]
total_discordant = sum([sum(s[2:]) for s in summary[2:]]) - total_concordant
---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-91-f8b5ba9910be> in <module>
----> 1 summary, samples, variants = hl.concordance(all_array_pts, vkorc1_mt)
      2 left_homref_right_homvar = summary[2][4]
      3 left_het_right_missing = summary[3][1]
      4 left_het_right_something_else = sum(summary[3][:]) - summary[3][3]
      5 total_concordant = summary[2][2] + summary[3][3] + summary[4][4]

<decorator-gen-1734> in concordance(left, right, _localize_global_statistics)

/opt/conda/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.7/site-packages/hail/methods/qc.py in concordance(left, right, _localize_global_statistics)
    411     require_col_key_str(right, 'concordance, right')
    412 
--> 413     left_sample_counter = left.aggregate_cols(hl.agg.counter(left.col_key[0]))
    414     right_sample_counter = right.aggregate_cols(hl.agg.counter(right.col_key[0]))
    415 

<decorator-gen-1214> in aggregate_cols(self, expr, _localize)

/opt/conda/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    575     def wrapper(__original_func, *args, **kwargs):
    576         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577         return __original_func(*args_, **kwargs_)
    578 
    579     return wrapper

/opt/conda/lib/python3.7/site-packages/hail/matrixtable.py in aggregate_cols(self, expr, _localize)
   2035         agg_ir = ir.TableAggregate(ir.MatrixColsTable(base._mir), subst_query)
   2036         if _localize:
-> 2037             return Env.backend().execute(agg_ir)
   2038         else:
   2039             return construct_expr(ir.LiftMeOut(agg_ir), expr.dtype)

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     96                 raise HailUserError(message_and_trace) from None
     97 
---> 98             raise e

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in execute(self, ir, timed)
     72         # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     73         try:
---> 74             result = json.loads(self._jhc.backend().executeJSON(jir))
     75             value = ir.typ._from_json(result['value'])
     76             timings = result['timings']

/opt/conda/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1308         answer = self.gateway_client.send_command(command)
   1309         return_value = get_return_value(
-> 1310             answer, self.gateway_client, self.target_id, self.name)
   1311 
   1312         for temp_arg in temp_args:

/opt/conda/lib/python3.7/site-packages/hail/backend/py4j_backend.py in deco(*args, **kwargs)
     30                 raise FatalError('%s\n\nJava stack trace:\n%s\n'
     31                                  'Hail version: %s\n'
---> 32                                  'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
     33         except pyspark.sql.utils.CapturedException as e:
     34             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: StackOverflowError: null

Java stack trace:
is.hail.utils.HailException: error after applying LowerMatrixToTable
	at is.hail.utils.ErrorHandling$class.fatal(ErrorHandling.scala:15)
	at is.hail.utils.package$.fatal(package.scala:77)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:24)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:12)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:12)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:28)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:360)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:344)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:341)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:25)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:23)
	at is.hail.utils.package$.using(package.scala:618)
	at is.hail.annotations.RegionPool$.scoped(RegionPool.scala:13)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:23)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:254)
	at is.hail.backend.spark.SparkBackend.execute(SparkBackend.scala:341)
	at is.hail.backend.spark.SparkBackend$$anonfun$7.apply(SparkBackend.scala:385)
	at is.hail.backend.spark.SparkBackend$$anonfun$7.apply(SparkBackend.scala:383)
	at is.hail.utils.ExecutionTimer$.time(ExecutionTimer.scala:52)
	at is.hail.backend.spark.SparkBackend.executeJSON(SparkBackend.scala:383)
	at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Thanks,
Andrew

Hi Andrew,

Unfortunately huge union_cols is something we don’t support well at the moment. It’s on our radar, but in the meantime, I’d suggest unioning in batches. For example, if you can union 100 files successfully, write out files containing 100 cols each, then union those in batches of 100, etc.

Hey Patrick,
Thanks for the quick response. That was going to be my next approach unless someone had a better idea. I’ll try it out and follow up with how it went.
Thanks again for your help

This is the approach that I used for anyone who might need it. It works on small datasets but does not scale up well.

def get_table(file):
    if file.endswith("vcf"):
        patient_id = file.split('/')[-1].split('.')[0]
        mt = hl.methods.import_vcf(file)
        mt = mt.key_cols_by(s = patient_id)
    else:
        mt = hl.read_matrix_table(file)
    return mt
    
def build_tables(files, counter=1):
    # counter is used to create the filenames. It is not meaningful in any way
    # other than to avoid overlaps of the same names in the hail query
    tables = []
    n = 5
    if len(files) > 1:
        for i, file in enumerate(files, 1):
            mt = get_table(file)
            try:
                combined_mt = combined_mt.union_cols(mt)
            except NameError:
                combined_mt = mt
            if i % n == 0:
                table_path = f"{bucket}/hail_array_tables/array_table_{counter}.mt"
                combined_mt.write(table_path, overwrite=True)
                tables.append(table_path)
                del combined_mt
            counter += 1
        # This is needed when number of files is not perfectly divisible by n
        if len(files) % n != 0:
            table_path = f"{bucket}/hail_array_tables/array_table_{counter}.mt"
            combined_mt.write(table_path, overwrite=True)
            tables.append(table_path)
            del combined_mt
        return build_tables(tables, counter=counter)
    else:
        return files[0]