Resource -> runtime question for large datasets


#1

Greetings!

I’m interested in learning users’ and developers experience with scaling of compute resources and the resulting runtimes when working with large datasets (100k’s of samples, millions of markers).

I know there are an abundant number of factors and configurations that can cause runtime to vary, but I think these data points could be of some value to me and other users who are similarly situated in the planning stages of such a project.

Many thanks,
Jason


#2

There are roughly three parameters that are going to determine runtime:

  1. The size of your data
  2. The pipeline you’re running (linear regression is >10x faster than logistic regression, for instance)
  3. The number of machines you’re using.

Broad analysts have had success running with up to about 1500 CPU cores simultaneously on 30T compressed VCFs, but with more cores than that, Spark’s scalability seems to start to break down.


#3

If you have a rough workflow, we can help you estimate runtime much better.


#4

Thanks Tim,

Was that for the UKBiobank heritability analysis? About how many core hours did it take to process the 30T?

The notes on core limits and regression performance difference are helpful!


#5

The 30T dataset was the gnomad whole genomes – the UKB BGEN is actually about 2T compressed, since BGEN an extremely efficient format that uses 2 bytes per genotype before compression. In memory, though, it’s more like 150 TB.

I can’t find the details right now, but I think the UKB rapid GWAS analysis required about 20 hours on something like 400 cores. I remember it costing about $2k on Google Cloud. Analyzing just one phenotype would be way faster, though!


#6

Ah, fantastic. UKB is in my near future, so this is exactly what I was looking for. I’ll be sure to report back my experience. Thanks again!


#7

Hi, along the same lines, I’ve run a logistic regression on a data set with 3M variants, and 10K samples (using version devel-c6941a8e04e7) on dosage

gwas = hl.logistic_regression(test=‘lrt’, y=outcome_case, x=mt.DS, covariates=[mt.pc1, mt.pc2, mt.pc3, mt.age_ref, gender])
p = hl.plot.qq(gwas.logreg.p_value)

and noticed some unexpected behavior when using 100 nodes where the cluster CPU usage was around 70%.

I also noticed from the Spark console that it was running 3 actions (which correspond to the three periods of work in the cluster CPU plot above):

fold at RVD.scala:362
sortBy at TableIR.scala:1196
collect at ContextRDD.scala

and that the number of tasks was 286.

How is the numbers of tasks determined? Do you think that inefficiencies creep in when the number of nodes gets closer to the number of tasks?

I’m guessing the logistic regression happens during the fold action? But if so, why is so much time taken by the sortBy and collect actions? Also, do you anticipate that the speed of logistic regression will stay the same, or are you planning on working on that area more?

Thanks!


#8

I’ll have a bit more time to explain this later, but you can make your code 3x faster with the following:

gwas = hl.logistic_regression(test=‘lrt’, y=outcome_case, x=mt.DS, covariates=[mt.pc1, mt.pc2, mt.pc3, mt.age_ref, gender])
ht = gwas.rows().cache()
p = hl.plot.qq(ht.logreg.p_value)

#9

How is the numbers of tasks determined?

The number of tasks is the number of parallel partitions. This is a set parameter of a native MT/Table format, and is determined by block size (on hl.init()) for VCF / text import, and by BGEN import parameters.

Do you think that inefficiencies creep in when the number of nodes gets closer to the number of tasks?

Yes, if the number of tasks is the number of cores, then the slowest task will be the total runtime. I’d expect exactly these curves in that case (sometimes with longer tails!)

I’m guessing the logistic regression happens during the fold action?

Yes. And the sortBy, actually. Spark/Hail execution is totally lazy, which can lead to things happening multiple times.

Also, do you anticipate that the speed of logistic regression will stay the same, or are you planning on working on that area more?

Logistic regression is slow, and the algorithm isn’t as fast as, say, solving linear regression (which uses extremely well-optimized BLAS routines). We could probably make it ~3x faster by rewriting it in C++, maybe a bit more, but not orders of magnitude. Though I do think there are better ways to hook it into our optimizer.


#10

Have just pushed a change that should help the latter stages, too


#11

Using the caching worked great! Thanks for that advice!

Also although my cluster had 100 core nodes, it looks like the tasks must be allocated to individual CPUs, rather than, say, one logistic regression task per node, run itself as a multi-core job. With 286 tasks and 400 core CPUs, then getting a cluster CPU usage around 70% makes sense.

Is there a way I can change the number of tasks? I usually think of breaking up the data set into chunks where the chunk size is set to n variants, and can be change at the beginning of a run.

Thanks!


#12

the number of tasks is determined by the number of partitions. This can be configured using min_partitions on import_vcf, by manipulating the block_size on hl.init, or Spark configuration. All of this stuff is a bit chaotic though… Spark doesn’t make it easy!


#13

Hi, this is really great information. I’m curious how the GNOMAD data was pre-processed for import. With a large number of samples, the vcf.bgz file gets unwieldy and annoying to append samples. Is the best way to split by chromosome (or smaller), merge samples with bcftools, and then load with hl.import_vcf([vcf_1, vcf_2, vcf_3, …])?

Also, I would ideally like to save the data in matrixtable format to prevent needing to reimport.
Is there any way to store the matrixtable such that the number of tasks can be varied later? Would storing with many partitions (say 1500) lead to problems on smaller clusters?

Thanks


#14

The gnomAD VCF was generated by joint-calling individual gVCFs. This task was done by the infrastructure team upstream at Broad; the scientists a (bunch of shards of) vcf.bgz to start.

Merging shards with bcftools and then importing everything into Hail seems like your best bet for now. One of our engineers, Chris, is working on a solution that will let Hail import and store gVCF data in a fashion that permits incremental sample addition, but this won’t be ready for GA for months.

Also, I would ideally like to save the data in matrixtable format to prevent needing to reimport.
Is there any way to store the matrixtable such that the number of tasks can be varied later

It’s much easier to reduce parallelism than increase it, right now. 1500 partitions should be fine on a small cluster – you’ll run into problems when you have:

  • huge partitions, or
  • huge numbers of partitions (~100K and up)