Densify running out of memory

I’m running into an out of memory error when trying to densify a MatrixTable:

[Stage 15:===>                                            (2002 + 3006) / 30000]OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007fb132d80000, 4354736128, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 4354736128 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /tmp/c98e624072f34b96a616efa158df27f5/hs_err_pid31285.log

Here is the code:

def main(args):
    hl.init(log="/create_qc_data.log", default_reference="GRCh38")

    data_source = "broad"
    freeze = args.freeze

    if args.compute_qc_mt:
        logger.info("Reading in raw MT...")
        mt = get_ukbb_data(
            data_source,
            freeze,
            key_by_locus_and_alleles=True,
            split=False,
            raw=True,
            repartition=args.repartition,
            n_partitions=args.raw_partitions,
        )
        mt = mt.select_entries("LGT", "GQ", "DP", "LAD", "LA", "END")

        logger.info("Reading in QC MT sites from tranche 2/freeze 5...")
        if not hl.utils.hadoop_exists(f"{qc_sites_path()}/_SUCCESS"):
            get_qc_mt_sites()
        qc_sites_ht = hl.read_table(qc_sites_path())
        logger.info(f"Number of QC sites: {qc_sites_ht.count()}")

        logger.info("Densifying sites...")
        last_END_ht = hl.read_table(last_END_positions_ht_path(freeze))
        mt = densify_sites(mt, qc_sites_ht, last_END_ht)

        logger.info("Checkpointing densified MT")
        mt = mt.checkpoint(
            get_checkpoint_path(
                data_source, freeze, name="dense_qc_mt_v2_sites", mt=True
            ),
            overwrite=True,
        )

        logger.info("Repartitioning densified MT")
        mt = mt.naive_coalesce(args.n_partitions)
        mt = mt.checkpoint(
            get_checkpoint_path(
                data_source, freeze, name="dense_qc_mt_v2_sites.repartitioned", mt=True,
            ),
            overwrite=True,
        )

        # NOTE: Need MQ, QD, FS for hard filters
        logger.info("Adding info and low QUAL annotations and filtering to adj...")
        info_expr = get_site_info_expr(mt)
        info_expr = info_expr.annotate(**get_as_info_expr(mt))
        mt = mt.annotate_rows(info=info_expr)
        mt = mt.annotate_entries(GT=hl.experimental.lgt_to_gt(mt.LGT, mt.LA))
        mt = mt.select_entries("GT", adj=get_adj_expr(mt.LGT, mt.GQ, mt.DP, mt.LAD))
        mt = filter_to_adj(mt)

        logger.info("Checkpointing MT...")
        mt = mt.checkpoint(
            get_checkpoint_path(
                data_source,
                freeze,
                name=f"{data_source}.freeze_{freeze}.qc_sites.mt",
                mt=True,
            ),
            overwrite=True,
        )

densify_sites: https://github.com/broadinstitute/gnomad_methods/blob/master/gnomad/utils/sparse_mt.py#L62

I added two checkpoints to densify_sites, one for sites_ht and one for mt. After the densify crashed, I re-ran the code using the checkpointed data and got the same error. The log doesn’t seem to want to attach, but I can send it via slack.

I have already run a densify on the same input MT successfully for another task. Could you help me figure out why this smaller densify on fewer rows is running out of memory (same cluster configuration)?

this one I think should also be fixed by 0.2.35. The commit that broke it was the last commit before we release 0.2.34 :upside_down_face:

1 Like

We found a separate memory leak in 0.2.35. Should actually be fixed in 0.2.36, which we’ll try to release today.

1 Like

thanks for the fix!! the densified mt successfully wrote

I have to re-run a separate step that densifies and am getting this error again:

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007efb7bb80000, 2912944128, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2912944128 bytes for committing reserved memory.

Code:

def compute_interval_callrate_dp_mt(
    data_source: str,
    freeze: int,
    mt: hl.MatrixTable,
    intervals_ht: hl.Table,
    bi_allelic_only: bool = True,
    autosomes_only: bool = True,
    target_pct_gt_cov: List = [10, 20],
) -> None:
    """
    Computes sample metrics (n_defined, total, mean_dp, pct_gt_20x, pct_dp_defined) per interval.

    Mean depth, pct_gt_{target_pct_gt_cov}x, and pct_dp_defined annotations are used during interval QC.
    Mean depth and callrate annotations (mean_dp, n_defined, total) are used during hard filtering.
    Callrate annotations (n_defined, total) are also used during platform PCA.
    Writes call rate mt (aggregated MatrixTable) keyed by intervals row-wise and samples column-wise.
    Assumes that all overlapping intervals in intervals_ht are merged.
    NOTE: This function requires a densify! Please use an autoscaling cluster.

    :param str data_source: One of 'regeneron' or 'broad'
    :param int freeze: One of the data freezes
    :param MatrixTable mt: Input MatrixTable.
    :param Table intervals_ht: Table with capture intervals relevant to input MatrixTable.
    :param bool bi_allelic_only: If set, only bi-allelic sites are used for the computation.
    :param bool autosomes_only: If set, only autosomal intervals are used.
    :param List target_pct_gt_cov: Coverage levels to check for each target. Default is [10, 20].
    :return: None
    """
    logger.warning(
        "This function will densify! Make sure you have an autoscaling cluster."
    )
    logger.info("Computing call rate and mean DP MatrixTable...")
    if len(intervals_ht.key) != 1 or not isinstance(
        intervals_ht.key[0], hl.expr.IntervalExpression
    ):
        logger.warning(
            f"Call rate matrix computation expects `intervals_ht` with a key of type Interval. Found: {intervals_ht.key}"
        )
    logger.info("Densifying...")
    mt = mt.drop("gvcf_info")
    mt = hl.experimental.densify(mt)

    logger.info(
        "Filtering out lines that are only reference or not covered in capture intervals"
    )
    logger.warning(
        "Assumes that overlapping intervals in capture intervals are merged!"
    )
    mt = mt.filter_rows(hl.len(mt.alleles) > 1)
    intervals_ht = intervals_ht.annotate(interval_label=intervals_ht.interval)
    mt = mt.annotate_rows(interval=intervals_ht.index(mt.locus).interval_label)
    mt = mt.filter_rows(hl.is_defined(mt.interval))

    if autosomes_only:
        mt = filter_to_autosomes(mt)

    if bi_allelic_only:
        mt = mt.filter_rows(bi_allelic_expr(mt))

    logger.info(
        "Grouping MT by interval and calculating n_defined, total, and mean_dp..."
    )
    mt = mt.annotate_entries(GT=hl.experimental.lgt_to_gt(mt.LGT, mt.LA))
    mt = mt.select_entries(
        GT=hl.or_missing(hl.is_defined(mt.GT), hl.struct()),
        DP=hl.if_else(hl.is_defined(mt.DP), mt.DP, 0),
    )
    mt = mt.group_rows_by(mt.interval).aggregate(
        n_defined=hl.agg.count_where(hl.is_defined(mt.GT)),
        total=hl.agg.count(),
        dp_sum=hl.agg.sum(mt.DP),
        mean_dp=hl.agg.mean(mt.DP),
        **{
            f"pct_gt_{cov}x": hl.agg.fraction(mt.DP >= cov) for cov in target_pct_gt_cov
        },
        pct_dp_defined=hl.agg.count_where(mt.DP > 0) / hl.agg.count(),
    )
    mt.write(callrate_mt_path(data_source, freeze, interval_filtered=False))

and

logger.warning("Computing the call rate MT requires a densify!\n")
logger.info("Reading in raw MT...")
mt = get_ukbb_data(
    data_source,
    freeze,
    split=False,
    key_by_locus_and_alleles=True,
    raw=True,
    repartition=args.repartition,
    n_partitions=n_partitions,
 )
 capture_ht = hl.read_table(capture_ht_path(data_source))
 compute_interval_callrate_dp_mt(
    data_source,
    freeze,
    mt,
    capture_ht,
    autosomes_only=False,
    target_pct_gt_cov=target_pct_gt_cov,
 )

I will send the log via email

this is 0.2.36, right?

yup!

I tried running the code again on 0.2.37 using a 15,000 partitions (instead of the 30,000 I used three days ago). The code got farther, but I got this error:

hail.utils.java.FatalError: SparkException: Job aborted due to stage failure: Task 6938 in stage 14.0 failed 20 times, most recent failure: Lost task 6938.19 in stage 14.0 (TID 39393, kc1-w-103.c.maclab-ukbb.internal, executor 1539): ExecutorLostFailure (executor 1539 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  20.0 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

Do you have any guidance for cluster configuration? I used 300 highmem-8 workers.

@cdv can you look into this?

as an update, I tried using a larger cluster on 0.2.37 which still didn’t work. I reverted to 0.2.34, and somehow, the job ran successfully. I will email the log

I just spoke with @ch-kr . I asked them to run the computation again without densify, where it did OOM again. This leads me to believe that the issue is in group_rows_by rather than densify. I’m going to look into the memory usage there. At the same time, we expose a hidden parameter for table’s group rows by that allows explicit control over the buffer used in the aggregation. I’m going to extend matrix table to have the same functionality.

Hi hail team,

I’m getting another out of memory error with a densify step. I’m trying to calculate frequency information on the 300K and got this error:

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f7ae9680000, 13452181504, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 13452181504 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /tmp/eb655710859a4c968045f5a9c6792931/hs_err_pid9139.log

I will email the code and log, since the log is too big to attach here.

If you’re seeing this message, it means that the master is running out of memory. It is probably running out due to the very wide arrays during the densify scan.

What is the type of the entries that are being densified? Is it just GT, adj, and END?

There may be a few things that can be done:

  1. Increase master memory. (are you just using an 8-highmem?)
  2. Reduce the number of scan states that can be resident in memory at once. We have a context flag called max_leader_scans that is set to 1000 by default and may be too many for a table as wide as this. To set the flag, you can use hl._set_flags(max_leader_scans='200') note the string argument it will not work with an integer.

Thanks for looking into this, Chris!

  • Entries: yes, the MT only has GT, adj, and END.
  • Master memory: I tried using an n1-highmem-16 master for that run.
  • Scans: Cool, thank you! I’ll try adding this.

Should I switch to an n1-highmem-32 master and add the max_leader_scans flag? Or should I start with just adding the context flag?

I’m not sure, I think the setting should be more impactful for memory use, but both won’t hurt.

1 Like

I tried both things and got this:

Hail version: 0.2.44-6cfa355a1954
Error summary: IOException: All datanodes [DatanodeInfoWithStorage[10.128.0.115:9866,DS-383f3dfd-0aca-45a1-ad53-9296df15dfdb,DISK]] are bad. Aborting...
ERROR: (gcloud.dataproc.jobs.submit.pyspark) Job [290db16698204071a9131931d2ed97f8] failed with error:
Google Cloud Dataproc Agent reports job failure. If logs are available, they can be found at:
https://console.cloud.google.com/dataproc/jobs/290db16698204071a9131931d2ed97f8?project=maclab-ukbb&region=us-central1
https://console.cloud.google.com/storage/browser/dataproc-1aca38e4-67fe-4b64-b451-258ef1aea4d1-us-central1/google-cloud-dataproc-metainfo/f0f09d9a-8fec-4fa5-94ef-f19ae1681071/jobs/290db16698204071a9131931d2ed97f8/
gs://dataproc-1aca38e4-67fe-4b64-b451-258ef1aea4d1-us-central1/google-cloud-dataproc-metainfo/f0f09d9a-8fec-4fa5-94ef-f19ae1681071/jobs/290db16698204071a9131931d2ed97f8/driveroutput

I’ll send the log via email

What are your primary worker settings? We use hdfs for storage of scan intermediates and you may be running out of disk space in temporary storage.

I used this command:

hailctl dataproc start kc1 --master-machine-type n1-highmem-32 --worker-machine-type n1-highmem-8 --init gs://gnomad-public/tools/inits/master-init.sh --master-boot-disk-size 600 --worker-boot-disk-size 550 --project maclab-ukbb --max-idle=30m --properties=spark:spark.executor-memory=35g,spark:spark.speculation=true,spark:spark.speculation.quantile=0.9,spark:spark.speculation.multiplier=3 --num-workers=300 --packages gnomad

Should I have used a different parameter or given more disk space?

After digging into this, I’m baffled. I don’t know what resource limit is being exhausted here. We tried increasing open files on the master, reducing number of nodes, but nothing seems to be working. Here’s a full stack trace.

Traceback (most recent call last):
  File "/tmp/0fce35ede52f4f979a5868df49180d1c/generate_frequency_data.py", line 338, in <module>
    main(args)
  File "/tmp/0fce35ede52f4f979a5868df49180d1c/generate_frequency_data.py", line 267, in main
    platform_expr,
  File "/tmp/0fce35ede52f4f979a5868df49180d1c/generate_frequency_data.py", line 135, in generate_frequency_data
    overwrite=overwrite,
  File "<decorator-gen-1077>", line 2, in write
  File "/opt/conda/default/lib/python3.6/site-packages/hail/typecheck/check.py", line 614, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/opt/conda/default/lib/python3.6/site-packages/hail/table.py", line 1223, in write
    Env.backend().execute(ir.TableWrite(self._tir, ir.TableNativeWriter(output, overwrite, stage_locally, _codec_spec)))
  File "/opt/conda/default/lib/python3.6/site-packages/hail/backend/spark_backend.py", line 296, in execute
    result = json.loads(self._jhc.backend().executeJSON(jir))
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/conda/default/lib/python3.6/site-packages/hail/backend/spark_backend.py", line 41, in deco
    'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
hail.utils.java.FatalError: IOException: All datanodes [DatanodeInfoWithStorage[10.128.0.39:9866,DS-1574e23a-74ad-446e-b006-00a5b93f1095,DISK]] are bad. Aborting...

Java stack trace:
java.lang.RuntimeException: error while applying lowering 'InterpretNonCompilable'
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:26)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:18)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:18)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:28)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:317)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:304)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:303)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:20)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:18)
	at is.hail.utils.package$.using(package.scala:600)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:18)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:229)
	at is.hail.backend.spark.SparkBackend.execute(SparkBackend.scala:303)
	at is.hail.backend.spark.SparkBackend.executeJSON(SparkBackend.scala:323)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

java.lang.IllegalArgumentException: Self-suppression not permitted
	at java.lang.Throwable.addSuppressed(Throwable.java:1072)
	at java.io.FilterOutputStream.close(FilterOutputStream.java:159)
	at is.hail.utils.package$.using(package.scala:602)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1666)
	at is.hail.expr.ir.TableRename.execute(TableIR.scala:2288)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1432)
	at is.hail.expr.ir.TableFilter.execute(TableIR.scala:905)
	at is.hail.expr.ir.TableMapGlobals.execute(TableIR.scala:1737)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1432)
	at is.hail.expr.ir.TableMapGlobals.execute(TableIR.scala:1737)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1432)
	at is.hail.expr.ir.TableFilter.execute(TableIR.scala:905)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1432)
	at is.hail.expr.ir.TableMapGlobals.execute(TableIR.scala:1737)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1432)
	at is.hail.expr.ir.TableMapGlobals.execute(TableIR.scala:1737)
	at is.hail.expr.ir.TableMapRows.execute(TableIR.scala:1432)
	at is.hail.expr.ir.Interpret$.run(Interpret.scala:688)
	at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:53)
	at is.hail.expr.ir.InterpretNonCompilable$.interpretAndCoerce$1(InterpretNonCompilable.scala:16)
	at is.hail.expr.ir.InterpretNonCompilable$.is$hail$expr$ir$InterpretNonCompilable$$rewrite$1(InterpretNonCompilable.scala:53)
	at is.hail.expr.ir.InterpretNonCompilable$.apply(InterpretNonCompilable.scala:58)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.transform(LoweringPass.scala:50)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:15)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:15)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:15)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:13)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.apply(LoweringPass.scala:45)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:20)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:18)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:18)
	at is.hail.expr.ir.CompileAndEvaluate$._apply(CompileAndEvaluate.scala:28)
	at is.hail.backend.spark.SparkBackend.is$hail$backend$spark$SparkBackend$$_execute(SparkBackend.scala:317)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:304)
	at is.hail.backend.spark.SparkBackend$$anonfun$execute$1.apply(SparkBackend.scala:303)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:20)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:18)
	at is.hail.utils.package$.using(package.scala:600)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:18)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:229)
	at is.hail.backend.spark.SparkBackend.execute(SparkBackend.scala:303)
	at is.hail.backend.spark.SparkBackend.executeJSON(SparkBackend.scala:323)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

java.io.IOException: All datanodes [DatanodeInfoWithStorage[10.128.0.39:9866,DS-1574e23a-74ad-446e-b006-00a5b93f1095,DISK]] are bad. Aborting...
	at org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1538)
	at org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1472)
	at org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1244)
	at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:663)





Hail version: 0.2.44-6cfa355a1954
Error summary: IOException: All datanodes [DatanodeInfoWithStorage[10.128.0.39:9866,DS-1574e23a-74ad-446e-b006-00a5b93f1095,DISK]] are bad. Aborting...

Whoah, what’s HDFS doing? Are we even putting any data there?