LD matrix writing doesn't parallelize correctly and yields very large files

Hi!

We are currently trying to calculate the LD matrix for the publicly-accessible 1000G dataset, more specifically chr 22.
To that end, we

  1. import the VCF file and save it as a Matrix table,
  2. load the matrix table as mt,
  3. run the following to repartition, calculate the LD matrix and save it to S3:
> mt = mt.repartition(13)
> hl.ld_matrix(mt.GT.n_alt_alleles(), mt.locus, 10e6).sparsify_triangles().write("s3://some/prefix/ld_matrix.bm")

In the beginning, there is a phase that takes maybe 30 minutes with around ~300 steps, and from earlier tests we know that this is the ld_matrix() call. But then writing the LD matrix to S3 is very, very slow. In fact, only two blocks seem to be written at the same time and writing those two blocks takes between one and one and a half minutes. Our cluster configuration has 3 core nodes and 15 task nodes, so we would expect to have at least 15, if not 18 concurrent writes. This is an AWS EMR cluster, by the way.

Furthermore, the total number of blocks that will be written is around 18,000, with a block being on average around ~50 Mib. That makes for a 900 Gib total size, which seems very large. We are wondering if there is some sparsity structure not being exploited? But we also do realize that the 10 Mb LD radius is quite large, and chr 22 is only ~50 Mb, so there are only 5.5 10 Mb-blocks.

We would greatly appreciate any help with that issue!

Hi Simeon,

I’m guessing that you’ve experimented with this parameter already, but I’m curious to know how performance changes when you pass a different parameter to ‘repartition’? If you set that value to 500, then does that speed things up or slow things down?

As far as overall speed when running on AWS, I also wonder about writing to S3. Are those buckets locking for some reason, forcing Hail to perform all disk I/O with only one task node at a time? It might be worth provisioning an FSx for Lustre disk, since those disks are built to be shareable and to handle indexed files, and they can be provisioned to be high-speed. I’m not sure how you would set up your EMR cluster to reference such a disk, but if you used such a disk as both as the source for the 1000G data and a target for the resulting LD matrix table then maybe that would help?

As far as the size of the resulting file and the possibility of a sparser representation, I don’t have any suggestions. Anybody else?

I actually thought about exactly that issue (the partitions), too. My thinking was that if we reduce the size of the partitions, then we might even get around calculating and storing LD matrices and can instead calculate LD on-the-fly in reasonable time.
So when checking yesterday for chr 10, I found out that the matrix table we currently store has only six partitions, and thus (I guess) even if we’re interested in only a small region, we have to read in a block of 134 / 6 Mbp, unless Hail / Spark does some kind of smart partitioning depending on the data density. I then increased the number of partitions to 30, but that didn’t give any measurable speedup in my routine of calculating LD on-the-fly.

I would be very surprised if writing to S3 is locking from the S3 side. In fact, if all workers / task nodes could write concurrently, we would easily get into a feasible range for the LD matrix computation.
I might try storing a matrix table in HDFS and check whether reading it is faster than reading from S3.

Repartitioning this way is quite expensive – it’ll be much more efficient to choose the number of partitions you want when importing the VCF with min_partitions=13.

The issue to me sounds like the low parallelism (only two writers). The total size – around 1 TB – sounds like it’s in the realm of what I’d expect. LD matrices scale with the number of variants, so the small sample size of 1kg isn’t reducing the size here.

Could you share the Hail log file? This would be written to the working directory of the driver VM.

You might also try using a higher number of partitions than you expect, maybe 1000, for the initial import. Hail does block multiplication with square blocks of dimension 4096 by default, so writing the matrixtable as a block matrix – an intermediate step of computing the LD matrix – may be slow and not super parallel if you’ve only got a few partitions coming in.

Apologies for coming back to you so late, @tpoterba!
In the meantime, we “solved” the issue by figuring out a different way to speed up our use case - we divvied up the 1000G matrix tables we had by superpopulations and stored them with a higher number of partitions (using mt.repartition()). Then we calculate LD on-the-fly. That makes it sufficiently fast for us, and so for now we don’t need to pre-calculate and save the LD matrix anymore.

Do you mean it is expensive to repartition, or that, once the repartitioning is done, the result is going to be suboptimal and lead to unnecessarily slow follow-up calculations? If the former, I think I did indeed notice that repartitioning was slow, but manageable. If the latter, then it might be worth revisiting this to speed up future access.

I don’t think I still have that log file, but if you still would like to help debug this, I can probably reproduce the issue and post the log file then.