Best way to repartition heavily-filtered matrix tables?

Hi! I recently posted a question on Zulip chat on how best to repartition heavily-filtered data, however I thought I’d repost here to get any additional thoughts/discussion on what the best approach is to repartitioning.

Prior to the discussion from the link above, I was using the repartition function as per the hail documentation example (dataset_result = dataset.repartition(1000)), but with shuffle=FALSE (you can find that script here). I later found that using shuffle=False can cause a loss of parallelism and that repartitioning data can have some unpredictable behaviour. To avoid this, and to have more consistent behaviour from repartition, I was recommended to use the following code:

    mt = mt.semi_join_rows(mt2.rows())
    mt_path = f'{output}/mt_filtered.mt'
    tmp_path = mt_path + '.tmp'
    mt = mt.checkpoint(tmp_path)
    hl.read_matrix_table(tmp_path, _n_partitions=1000).write(mt_path)
    hl.current_backend().fs.rmtree(tmp_path)

Unfortunately, using this method, my script never seemed to finish after allocating 12 hours of run time (using 20 preemptible workers), and failed with the following error:
WARNING: Job terminated, but output did not finish streaming

I tried changing the workers from preemptible to non-preemtible workers, and also played around with the number of partitions (changing from 100 to 1000). When I omit the repartitioning step, the entire script finishes successfully and relatively quickly (less than an hour) using 20 preemptible workers.

I’m wondering if there is another way of repartitioning heavily-filtered files, or perhaps a better way of avoiding many tiny partitions?

Thanks!

This is the strategy I would recommend. What’s upstream of the semi_join_rows? That this is taking a long time is surprising to me.

Hi Tim, the script itself actually has quite a lot of costly functions going on before the repartitioning, including densifying the dataset and ld-pruning. However, in the log file these operations all finish, leading me to believe it’s the repartitioning step. The job itself also completes relatively quickly when I run it without repartitioning.

This is super weird, I would expect the read(_n_partitions=…).write to be much faster than the rest of this query.

is it correct that the difference between the two scripts is the following:

repartition version

    mt_path = f'{output}/tob_wgs_hgdp_1kg_filtered_variants.mt'
    tmp_path = mt_path + '.tmp'
    hgdp1kg_tobwgs_joined = hgdp1kg_tobwgs_joined.checkpoint(tmp_path)
    hl.read_matrix_table(tmp_path, _n_partitions=100).write(mt_path)
    hl.current_backend().fs.rmtree(tmp_path)

no repartition

    hgdp1kg_tobwgs_joined.write(mt_path)

Yes, that’s exactly right. The only difference between the successful and unsuccessful scripts is the code above. I played around with a few different ways of making the script more efficient (such as reducing the total number of variants for LD pruning), but ultimately just removing the repartitioning step (as in your no repartition example above) seemed to solve the issue. I know you mentioned earlier that repartitioning can be unpredictable at times, particularly when shuffle=False. I’m curious if there are any alternative methods to reducing the total number of partitions, as this will greatly decrease the computational overhead downstream.

OK, I’m totally stumped. This is pretty much the most efficient way to repartition.

If you have the log file for one of these long-running tasks, I would love to look at that – it should tell us where the pipeline is stalled.

(it could be the rmtree in fact)

No problem, I’m attaching two log files here: one with using rmtree with 100 partitions, and one without using rmtree with 1000 partitions.

The script for the first log file (with rmtree) can be found here, and the second one (without rmtree) can be found here.

Hi Katalina,
These actually aren’t the logs I need – Hail writes a log file to the current working directory while it runs, and this is the one that has more detailed information (with timestamps) about the compilation and execution of each query. On Dataproc, these logs are written to files in /home/hail on the driver VM. They can be somewhat large (>100MB) for queries with a lot of Spark logging (which scales with the # of partitions), so feel free to send by email if that’s easier.

For the long-running job, just SSH in and copy out the log when it hits the point that it stops making progress – we shouldn’t need hours of logging about that.

Hi Tim, here’s the link to the log file with the more detailed information: Google Drive: Sign-in

Hi @tpoterba, just a friendly ping- Did you manage to look into the log files to see where the issue might be occurring? Let me know if you need to me to attach anything else!