Performance of writing matrixtable on 0.2


#1

I’m currently joining a number of VCFs together and saving as matrixtable for later use on gcloud using Nealelab/cloudtools. On a test dataset this operation took a surprisingly long time (almost 10 hours for 90 19 mil site VCFs on the basic 3 machine setup).

I also see Hail: INFO: Coerced sorted dataset appear almost constantly on most operations. Is this possibly causing performance problems? Is there any way to perform the sort at load?


#2

I’m currently joining a number of VCFs together

can you share the python code you’re running?


#3

Yeah, thanks for the quick reply!

import hail as hl
hl.init()

from google.cloud import storage

client = storage.Client()
bucket = client.get_bucket('vcf-bucket')

mt = hl.read_matrix_table('gs://hail-data/initial.mt')
for b in bucket.list_blobs(prefix='vcf-prefixes'):
    mt_temp = hl.import_vcf('gs://vcf-bucket/'+b.name,force_bgz=True)
    mt = mt.union_cols(mt_temp)
mt.write('gs://hail-data/output.mt')

#4

Aha. Yeah, this won’t really work as written - union_cols requires copying the left and the right data, so this is actually quadratic in the number of VCFs when done iteratively as you’re trying!

Another likely problem is that union_cols is filtering to common sites – if your VCFs are single-sample VCFs that record only the variants each sample had a mutation at, then you’ll end up with no sites at the end (every site will be hom-ref in somebody). Is this the right categorization of your data?

You could get down to N log2(N) complexity by unioning in a tree structure, but this will also be pretty slow. The right solution is a gVCF importer, which one of our engineers, Chris, is working on. This won’t be ready for several months, though.

It may also be possible to use to_matrix_table, which will be linear, but will require shuffling all the data over the network, which could be very slow.


#5

Thanks, the common sites issue isn’t a problem as these are gVCF-like, ie all the same sites are emitted in VCF.

Just to make sure though, I looked at the actual spark job times, and it looks like all the reads and joins are quick, but the write itself takes most of the time. Is the reason the write is so costly because it waits to perform all the joins when it does the write?


#6

Yeah, the spark stages won’t be separated well by python command. Generally all of the compute will be saved until the last stage (write, aggregate, count, etc)


#7

Just for grins, what would happen if a list mts contained all of the loaded VCFs as MatrixTable objects, and one ran this snippet?

union_expr = 'mts[0]' + ''.join([f'.union_cols(mts[{i}])' for i in range(1, len(mts))])
final_mt = eval(union_expr)

This is logically equivalent to:

final_mt = mts[0].union_cols([mts[1])....union_cols(mts[len(mts)])

#8

in a little clearer (identical) python:

mt = mts[0]
for mt_ in mts[1:]:
    mt = mt.union_cols(mt_)

This will be really bad to point of infeasibility. union_cols needs to copy the left + right data, so this code is quadratic in the number of VCFs.

Incremental sample addition is the #1 most requested feature in Hail, and one our new engineers is working on that for his first major project. We expect that the solution will be to load gVCFs (with ref blocks) and produce mergeable sparse matrix tables from them, then merge these together hierarchically.

This won’t be ready for several months, though.

If the VCFs all have the same variants, then it might be possible to write a linear union_cols algorithm in the shorter term.


#9

for now you can also get down to O(N * log2 N) doing a tree union:

mts_ = mts[:]

iteration = 0
while (len(mts_) > 1):
    iteration += 1
    print(f'iteration {iteration}')
    tmp = []
    for i in range(0, len(mts_), 2):
        tmp.append(hl.MatrixTable.union_cols(mts_[i:i+2]))
    mts_ = tmp[:]
[final_mt] = mts_

#10

also probably pretty slow though