Dear Hail Team,
iam currently executing a large-ish hail pipeline on an on-premises spark cluster on different local HPC systems and observed some behaviour that I am struggling to understand relating to the resource usage of hail.
In broad terms, I have a VDS with ca. 120k partitions that was built on an external system (with default settings for the VDS Combiner) and stored in S3. For processing I downloaded it to the local filesystem of the HPC Iam working on, hoping for some performance gain due to less network traffic. The start of the pipeline that Iam executing uses the following code:
vds = hl.vds.read_vds(<PATH>) # O(120_000) partitions
telomeres_and_centromeres_ht = hl.read_table(<PATH>) # O(10) partitions
tel_centr = telomeres_and_centromeres_ht.aggregate(hl.agg.collect(telomeres_and_centromeres_ht.interval)
vds = hl.vds.filter_intervals(vds, tel_centr, keep=False)
vds.write(<PATH>, overwrite=True)
On a test set (chr20, chrX, chrY), this code finished quickly and with good efficiency, but when running on the complete callset I observed that this analysis became more or less single-threaded by default. Essentially, the above code only used 1 core on one node out of a total of > 300 cores. This led to the `filter_intervals` call above taking more than 10 hrs just to write the reference data matrix table of the VDS (and likely much more for the complete VDS, but I cancelled these runs).
Shuffling the input data, by adding the following two lines restored the CPU performance to normal levels that I usually see from hail, allowing this step to finish within 45 minutes.
vds.variant_data = vds.variant_data.repartition(1000, shuffle=True)
vds.reference_data = vds.reference_data.repartition(1000, shuffle=True)
What could be the reason for this behaviour? I was of the impression that with enough partitions, hail / spark should distribute the work evenly over all workers, so how does this shuffle improve the runtime efficiency to this degree? Is this way of repartitioning the VDS even supported, or do some of the methods in hl.vds assume some kind of matched partitioning scheme between both tables?
Since I have observed similar behaviour of hail in other pipelines as well, should I just force a shuffle on most large tables on import to guarantee that work is evenly distributed on the different workers?