Computation speed of hail aggregation

I was using Hail for computing the polygenic score in AllofUs, but the computation seems to be very slow. AllofUs genotype array has 1,824,517 variants and 165,127 individuals, and I used hl.agg.sum() to aggregate a score for each individual:

mt_array_path = os.getenv("MICROARRAY_HAIL_STORAGE_PATH")
mt_array = hl.read_matrix_table(mt_array_path)

# generate some random effect sizes
mt_array = mt_array.annotate_rows(rand = hl.rand_unif(-1, 1))
mt_array = mt_array.annotate_cols(
    score= hl.agg.sum(
        mt_array.rand * mt_array.GT.n_alt_alleles()
    )
)

mt_array.cols().export(f'{bucket}/test/Score_rand.tsv')

The above computation took more than 30 minutes with 2 workers and 10 preemptible.

As for comparison, I used R to simulate the PGS calculation with the same number of variants with my Mac machine:

rand = runif(1824517, -1, 1)
geno = rbinom(1824517, 2, 0.2)
microbenchmark::microbenchmark(sum(rand * geno), times = 100)

It took 10ms to aggregate a score for each individual, and ~25 minutes for all 165,127 samples.

Any suggestions on speeding up PGS calculation would be really appreciated!

Hey @TaotaoTan !

As a start, I’ll just say: Hail is slow. We’re a small team frantically trying to engineer a system that can process >1M whole genome samples. The use case we’ve most optimized for is the gnomAD quality control process. That has 100s of millions of variants and 1M samples. Matrix multiply (what you’re doing here) is not something we’ve optimized for.

I’ll feed this particular example back to the team. We made some changes to randomness recently and that appears to have had a dramatic & negative effect on speed, unfortunately. We’re working on a fix for that now. I’d be curious how fast that pipeline is without the rand_unif (just use 2 or something).

That said, you could improve this pipeline slightly: Hail isn’t designed to write TSVs quickly. Replace that with cols().write(f'{bucket}/test/score_rand.ht'). That’s a more fair comparison to what R is doing. If you want to use that data in Python, try collect() instead of export.

What is the schema of mt_array? Can you print mt.describe()?

Hi @tpoterba,

Thanks for replying. I was using Hail version 0.2.107-2387bb00ceee to analyze a matrix table with this schema. I wonder if n_alt_alleles() function took some time to execute.

Global fields:
None

Column fields:
‘s’: str

Row fields:
‘locus’: locus
‘alleles’: array
‘rsid’: str
‘filters’: set
‘info’: struct {
AC: array,
AF: array,
AN: int32,
homozygote_count: array
}

Entry fields:
‘BAF’: float64
‘GT’: call
‘IGC’: float64
‘LRR’: float64
‘NORMX’: float64
‘NORMY’: float64
‘R’: float64
‘THETA’: float64
‘X’: int32
‘Y’: int32

Column key: [‘s’]
Row key: [‘locus’, ‘alleles’]

Hi @danking,

Thanks so much for the suggestions! I appreciate your team’s hard work to build this scalable software for genomic analysis!

As suggested, I changed my code to:

mt_array_path = os.getenv("MICROARRAY_HAIL_STORAGE_PATH")
mt_array = hl.read_matrix_table(mt_array_path)

mt_array = mt_array.annotate_cols(
    score= hl.agg.sum(
        2 * mt_array.GT.n_alt_alleles()
    )
)

mt_array.cols().write(f'{bucket}/test/Score_rand.ht')

But the speed doesn’t seem to improve much (> 30 minutes). I am wondering if you know any alternative software to compute PGS for large-scale data. I rule out Plink, since AllofUs stored WGS data with 20,000 vcf files, which makes Plink infeasible to use.

Again, I really appreciate your suggestions!

1 Like

So, I think Tim is getting at the fact that Hail might be network-bound in this case.

1,000,000 variants across 160,000 samples with 7 float64s, 1 GT (32-bits, sadly), and 2 int32s. In terabytes that corresponds to:

In [3]: (8 * 7 + 4 * 3) * 1_000_000 * 160_000 / 1024 / 1024 / 1024 / 1024
Out[3]: 9.89530235528946

Your nodes probably have a 16 gbps connection. That’s 2GB/s. If Hail could process this data at line-speed, then that’s ~422 seconds or seven minutes. In reality, that data is compressed, so the time to read should be, maybe, 1/3 to 1/7th. On the other hand, Hail probably isn’t processing data at line-speed (if you have 16 core nodes: 125 MB/s, 8 cores: 250 MB/s). I suspect Hail processes data at 20-50 MB/s, so 10-20% of ideal.

Comparing to your observed time of 30 minutes for 12 instances:

In [16]: (8 * 7 + 4 * 3 ) * 1_000_000 * 160_000 / 1024 / 1024 / 1024 / 12 / 30 / 60
Out[16]: 0.4691106301766855

469 MB/s which is either 59 MB/s or 29 MB/s (16 or 8 cores per instance).

OK, which is all to say, if you’re trying to rapidly experiment with different PRSes, I think you should make your matrix table smaller:

mt = mt.select_rows()
mt = mt.select_cols()
mt = mt.select_entries(x = mt.GT.n_alt_alleles())
mt.write('tiny-mt.mt')

My hope is that this file is 4 bytes per entry and about 596 GB. With 12 instances that’s 50 GB per instance. If Hail maintains its processing speed of 469 MB/s per instance that’s ~105 seconds. If you use 4x the instances, you could get that to ~25 seconds (for the same monetary cost!). I suspect you’ll see diminishing returns at larger and larger numbers of instances.

1 Like

One more thing you can try: exporting to PLINK format. It would be a giant file, but it should be ~37GB, if you can successfully construct it. You could download that onto a VM’s hard drive and use plink to compute PRSes. An SSD should give you ~500MB/s, which is ~74 seconds to do one PRS. If you have enough RAM to fit that dataset in memory, you should be able to do it much much faster.

1 Like

Thanks so much @danking. Indeed, the speed improves to ~ 2m after I removed irrelevant fields. This solved this issue!!

1 Like

Thanks so much for your help all!
I have one further question regarding the best options for scaling, as Taotato and I are hoping to extend this same pipeline to the much bigger All of Us (AoU) WGS data in addition to the array data that Taotao is currently piloting this work on.

The AoU WGS data is massive (~100k people, >700M vars in the current release, and a 2-3x larger release is scheduled this year), such that I am assuming that even after size reduction from stripping entry annotations the data will likely still be too big for us to practically store in our lab’s space on the cloud (which of course we have to pay storage for), at least in the long term. I can ping the AoU team to see if I can convince them to release another centrally stored file with only GT in entries, but wouldn’t bank on that.

So, I’m wondering your 2c on the best options for running this pipeline on WGS data. I see two options with Hail:

  1. temporarily writing out and saving a smaller but still extremely large WGS file without entry fields for use in just this project (I am assuming it will be too big for us to keep around indefinitely)
  2. not writing out a new file, but doing the entry field selection and then computing PGS across ~30 phenos, for 6 populations each
    Do you have a sense of the cost tradeoff for these options?

Thanks so much!
Elizabeth

Saving a second copy with just GT makes sense if you’re going to be using it frequently, but you can probably compute all of these PGSes in just one pass over the data. The cost of a pipeline can be oversimplified as cost_to_read + cost_to_compute, and in the example in the original post in this thread, the cost_to_read dominated. If you’re doing more work on the data as you read it (computing 180 PGSes)) then cost_to_compute increases, and the “waste” cost of doing the IO is going to be pretty small.

In all, I wouldn’t worry tremendously about doing this in reasonable cost for AoU.

Makes sense! Thank you so much for your guys’ help optimizing things here!

Could you please let me know the compute resource allocation of your environment. I am computing the polygenic score using just ~4600 SNPs from the WGS data and it took 4 days running time using hail. I am following the code to compute PGS provided by Lenon, et al. Nature Medicine - All of Us Researcher Workbench