Preventing a shuffle error

Hi Hail team,

I am running code for the new minor release to gnomAD v3.1 and got a shuffle error when using an autoscaling cluster.

This is region of the code that had the issue:

I’m assuming it is the key_cols_by here: gnomad_qc/create_hgdp_tgp_subset.py at 42d746cc67e5416e5de7021eaf4ee696b58380f9 · broadinstitute/gnomad_qc · GitHub, if so, is there a way to modify sample names without a shuffle?

It’s also entirely possible that I introduced a shuffle somewhere else in the code.

Thanks,
Julia

key_cols_by doesn’t shuffle – it’s adjust_subset_alleles here:

Ah yes, thank you! I will run it now with all workers and add that as a note.

Hey @tpoterba thank you for looking over this so quickly yesterday! I’m still struggling with this one. I checkpointed the MT right before the key_rows_by (after annotating the MT with the new_locus_alleles info) and am trying to checkpoint the MT right after that line using all workers.

    mt = hl.read_matrix_table("gs://gnomad-tmp/v3.1.2/intermediate_hgdp_tgp_subset_sparse_before_row_key.mt")
    mt = mt.key_rows_by(
        locus=mt.new_locus, alleles=mt.new_alleles
    )
    mt = mt.checkpoint("gs://gnomad-tmp/v3.1.2/intermediate_hgdp_tgp_subset_sparse_after_row_key.mt", overwrite=True)

However, it is hanging for a while at the last 2 partitions:

2021-09-23 05:40:46 Hail: INFO: Ordering unsorted dataset with network shuffle6]
[Stage 2:==================================================>(70862 + 2) / 70863]

So I’m worried I’m doing something wrong, and wondering if I need to restart it with a different cluster configuration or if I should just let it keep running. I started with the default hailctl dataproc cluster configuration and resized to 80 workers before running the above lines.

Thanks again,
Julia

Is this still running? Looking at the Spark UI might be a good idea. Can you add me to this project so I can poke at the cluster?

After discussion with Tim I added some code he wrote to split up the key_rows_by into two parts, one for the rows that have a locus change and one for the rows that don’t. Then use union_rows to combine the two MTs that have been rekeyed. If I understand correctly, most rows will not have a locus change and therefore the key_rows_by will not need to shuffle for those rows and will only need to shuffle for the rows with the locus change, so by doing this split (followed by the union_rows) you prevent a shuffle on the entire MT. This solved my problem of the hanging last 2 partitions.

def split_shuffle(mt):
    mt1 = mt.filter_rows(mt.locus == mt.new_locus)
    mt1 = mt1.key_rows_by(locus=mt1.new_locus, alleles=mt1.new_alleles)
    mt2 = mt.filter_rows(mt.locus != mt.new_locus)
    mt2 = mt2.key_rows_by(locus=mt2.new_locus, alleles=mt2.new_alleles)

    return mt1.union_rows(mt2)

mt = hl.read_matrix_table("gs://gnomad-tmp/v3.1.2/intermediate_hgdp_tgp_subset_sparse_before_row_key.mt")
mt = split_shuffle(mt)
mt = mt.checkpoint("gs://gnomad-tmp/v3.1.2/intermediate_hgdp_tgp_subset_sparse_after_row_key.mt", overwrite=True)

Thank you @tpoterba!