Shuffling and writing a MatrixTable appears to run the shuffle op twice

I’ve been learning Hail and Spark, and I can’t tell if the following is expected behavior. I have a large MT on GCS (~2.5T) that has poor data distribution right now (99%+ of data is on 11/1709 partitions). I’m looking to shuffle it and write it back to GCS (a new destination). I tried the following commands:

ds = hl.read_matrix_table(f'{bucket}/my_data.mt')
ds = ds.repartition(500, shuffle=True)
ds.write(f'{bucket}/shuffled.mt')

Keeping an eye on the spark ui, I noticed that it looks like it’s running the load and repartition twice before outputting it. See below:


(Note that this is just shy of a TB as I’m having to filter the data down into chunks as it was failing at the full size)

Am I misinterpreting? Is there something I am doing that would cause this or a way to avoid it?

This is expected. The implementation of repartition does a random hash-based shuffle first to balance partitions, then resorts. It’s possible to repartition from native matrixtable files by writing and reading with the _n_partitions=... flags, but in your case that might be super expensive because of the skewed partition sizes.

How did all the data end up in 11 partitions?

Ok, thank you. I couldn’t tell from the documentation (or going one or two function calls deep in the code) what the _n_partitions flags would actually do, so that’s great to know.

It’s an unusual situation as I had some limitations on environment. I imported a (large) number of single sample VCFs one-by-one and unioned the columns in blocks of 20, then 20x20 (and so on). I’m guessing the smaller MTs were partitioned unusually as I was using a bunch of small hail deployments (3 executors each). In retrospect, I did see the “min” and “max” sizes of the partitions in the early phases (specifically 1 row on the small end), but I didn’t recognize the risk of skewness and wrote it off.