Densify running out of memory

I’m running into an out of memory error when trying to densify a MatrixTable:

[Stage 15:===>                                            (2002 + 3006) / 30000]OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007fb132d80000, 4354736128, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 4354736128 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /tmp/c98e624072f34b96a616efa158df27f5/hs_err_pid31285.log

Here is the code:

def main(args):
    hl.init(log="/create_qc_data.log", default_reference="GRCh38")

    data_source = "broad"
    freeze = args.freeze

    if args.compute_qc_mt:
        logger.info("Reading in raw MT...")
        mt = get_ukbb_data(
            data_source,
            freeze,
            key_by_locus_and_alleles=True,
            split=False,
            raw=True,
            repartition=args.repartition,
            n_partitions=args.raw_partitions,
        )
        mt = mt.select_entries("LGT", "GQ", "DP", "LAD", "LA", "END")

        logger.info("Reading in QC MT sites from tranche 2/freeze 5...")
        if not hl.utils.hadoop_exists(f"{qc_sites_path()}/_SUCCESS"):
            get_qc_mt_sites()
        qc_sites_ht = hl.read_table(qc_sites_path())
        logger.info(f"Number of QC sites: {qc_sites_ht.count()}")

        logger.info("Densifying sites...")
        last_END_ht = hl.read_table(last_END_positions_ht_path(freeze))
        mt = densify_sites(mt, qc_sites_ht, last_END_ht)

        logger.info("Checkpointing densified MT")
        mt = mt.checkpoint(
            get_checkpoint_path(
                data_source, freeze, name="dense_qc_mt_v2_sites", mt=True
            ),
            overwrite=True,
        )

        logger.info("Repartitioning densified MT")
        mt = mt.naive_coalesce(args.n_partitions)
        mt = mt.checkpoint(
            get_checkpoint_path(
                data_source, freeze, name="dense_qc_mt_v2_sites.repartitioned", mt=True,
            ),
            overwrite=True,
        )

        # NOTE: Need MQ, QD, FS for hard filters
        logger.info("Adding info and low QUAL annotations and filtering to adj...")
        info_expr = get_site_info_expr(mt)
        info_expr = info_expr.annotate(**get_as_info_expr(mt))
        mt = mt.annotate_rows(info=info_expr)
        mt = mt.annotate_entries(GT=hl.experimental.lgt_to_gt(mt.LGT, mt.LA))
        mt = mt.select_entries("GT", adj=get_adj_expr(mt.LGT, mt.GQ, mt.DP, mt.LAD))
        mt = filter_to_adj(mt)

        logger.info("Checkpointing MT...")
        mt = mt.checkpoint(
            get_checkpoint_path(
                data_source,
                freeze,
                name=f"{data_source}.freeze_{freeze}.qc_sites.mt",
                mt=True,
            ),
            overwrite=True,
        )

densify_sites: https://github.com/broadinstitute/gnomad_methods/blob/master/gnomad/utils/sparse_mt.py#L62

I added two checkpoints to densify_sites, one for sites_ht and one for mt. After the densify crashed, I re-ran the code using the checkpointed data and got the same error. The log doesn’t seem to want to attach, but I can send it via slack.

I have already run a densify on the same input MT successfully for another task. Could you help me figure out why this smaller densify on fewer rows is running out of memory (same cluster configuration)?

this one I think should also be fixed by 0.2.35. The commit that broke it was the last commit before we release 0.2.34 :upside_down_face:

1 Like

We found a separate memory leak in 0.2.35. Should actually be fixed in 0.2.36, which we’ll try to release today.

1 Like

thanks for the fix!! the densified mt successfully wrote

I have to re-run a separate step that densifies and am getting this error again:

OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007efb7bb80000, 2912944128, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2912944128 bytes for committing reserved memory.

Code:

def compute_interval_callrate_dp_mt(
    data_source: str,
    freeze: int,
    mt: hl.MatrixTable,
    intervals_ht: hl.Table,
    bi_allelic_only: bool = True,
    autosomes_only: bool = True,
    target_pct_gt_cov: List = [10, 20],
) -> None:
    """
    Computes sample metrics (n_defined, total, mean_dp, pct_gt_20x, pct_dp_defined) per interval.

    Mean depth, pct_gt_{target_pct_gt_cov}x, and pct_dp_defined annotations are used during interval QC.
    Mean depth and callrate annotations (mean_dp, n_defined, total) are used during hard filtering.
    Callrate annotations (n_defined, total) are also used during platform PCA.
    Writes call rate mt (aggregated MatrixTable) keyed by intervals row-wise and samples column-wise.
    Assumes that all overlapping intervals in intervals_ht are merged.
    NOTE: This function requires a densify! Please use an autoscaling cluster.

    :param str data_source: One of 'regeneron' or 'broad'
    :param int freeze: One of the data freezes
    :param MatrixTable mt: Input MatrixTable.
    :param Table intervals_ht: Table with capture intervals relevant to input MatrixTable.
    :param bool bi_allelic_only: If set, only bi-allelic sites are used for the computation.
    :param bool autosomes_only: If set, only autosomal intervals are used.
    :param List target_pct_gt_cov: Coverage levels to check for each target. Default is [10, 20].
    :return: None
    """
    logger.warning(
        "This function will densify! Make sure you have an autoscaling cluster."
    )
    logger.info("Computing call rate and mean DP MatrixTable...")
    if len(intervals_ht.key) != 1 or not isinstance(
        intervals_ht.key[0], hl.expr.IntervalExpression
    ):
        logger.warning(
            f"Call rate matrix computation expects `intervals_ht` with a key of type Interval. Found: {intervals_ht.key}"
        )
    logger.info("Densifying...")
    mt = mt.drop("gvcf_info")
    mt = hl.experimental.densify(mt)

    logger.info(
        "Filtering out lines that are only reference or not covered in capture intervals"
    )
    logger.warning(
        "Assumes that overlapping intervals in capture intervals are merged!"
    )
    mt = mt.filter_rows(hl.len(mt.alleles) > 1)
    intervals_ht = intervals_ht.annotate(interval_label=intervals_ht.interval)
    mt = mt.annotate_rows(interval=intervals_ht.index(mt.locus).interval_label)
    mt = mt.filter_rows(hl.is_defined(mt.interval))

    if autosomes_only:
        mt = filter_to_autosomes(mt)

    if bi_allelic_only:
        mt = mt.filter_rows(bi_allelic_expr(mt))

    logger.info(
        "Grouping MT by interval and calculating n_defined, total, and mean_dp..."
    )
    mt = mt.annotate_entries(GT=hl.experimental.lgt_to_gt(mt.LGT, mt.LA))
    mt = mt.select_entries(
        GT=hl.or_missing(hl.is_defined(mt.GT), hl.struct()),
        DP=hl.if_else(hl.is_defined(mt.DP), mt.DP, 0),
    )
    mt = mt.group_rows_by(mt.interval).aggregate(
        n_defined=hl.agg.count_where(hl.is_defined(mt.GT)),
        total=hl.agg.count(),
        dp_sum=hl.agg.sum(mt.DP),
        mean_dp=hl.agg.mean(mt.DP),
        **{
            f"pct_gt_{cov}x": hl.agg.fraction(mt.DP >= cov) for cov in target_pct_gt_cov
        },
        pct_dp_defined=hl.agg.count_where(mt.DP > 0) / hl.agg.count(),
    )
    mt.write(callrate_mt_path(data_source, freeze, interval_filtered=False))

and

logger.warning("Computing the call rate MT requires a densify!\n")
logger.info("Reading in raw MT...")
mt = get_ukbb_data(
    data_source,
    freeze,
    split=False,
    key_by_locus_and_alleles=True,
    raw=True,
    repartition=args.repartition,
    n_partitions=n_partitions,
 )
 capture_ht = hl.read_table(capture_ht_path(data_source))
 compute_interval_callrate_dp_mt(
    data_source,
    freeze,
    mt,
    capture_ht,
    autosomes_only=False,
    target_pct_gt_cov=target_pct_gt_cov,
 )

I will send the log via email

this is 0.2.36, right?

yup!

I tried running the code again on 0.2.37 using a 15,000 partitions (instead of the 30,000 I used three days ago). The code got farther, but I got this error:

hail.utils.java.FatalError: SparkException: Job aborted due to stage failure: Task 6938 in stage 14.0 failed 20 times, most recent failure: Lost task 6938.19 in stage 14.0 (TID 39393, kc1-w-103.c.maclab-ukbb.internal, executor 1539): ExecutorLostFailure (executor 1539 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  20.0 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

Do you have any guidance for cluster configuration? I used 300 highmem-8 workers.

@cdv can you look into this?

as an update, I tried using a larger cluster on 0.2.37 which still didn’t work. I reverted to 0.2.34, and somehow, the job ran successfully. I will email the log

I just spoke with @ch-kr . I asked them to run the computation again without densify, where it did OOM again. This leads me to believe that the issue is in group_rows_by rather than densify. I’m going to look into the memory usage there. At the same time, we expose a hidden parameter for table’s group rows by that allows explicit control over the buffer used in the aggregation. I’m going to extend matrix table to have the same functionality.