Trying to annotate vcf subset and then filter according to properties

Hello I am new to Hail please forgive my ignorance.
I am using the UKBB and hail seems really difficult to use and yet the only possible way to complete the following:
I need to isolate a locus within a very large VCF and then annotate it with VEP/clinvar data.
Subsequently, I need to subset the rows further based on features such as the amino acid position- my code is as follows:

loc,aapositions='chr21:25880535-26171128',[2011,2010,2018,2146,2150,2149,2021,2141,2080,2140,2147,2144,2020,2032,2113,2172,2146,2044,2137,2149,2077,2149]
chrom,start,end=loc.replace(':',' ').replace('-',' ').split(' ')
aapositions_list=[str(mut)+';'+str(mut) for mut in aapositions]
file='/mnt/project/Bulk/Exome sequences/Population level exome OQFE variants, pVCF format - final release/ukb23157_c'+str(chrom).replace('chr','')+'*.vcf.gz'
chrom_files=glob.glob(file)
for chrom_file in chrom_files:
    chrom_file = 'file://'+chrom_file
    mt = hl.import_vcf(chrom_file,
            force_bgz=True,
            reference_genome="GRCh38",
            array_elements_required=False)
    # filtering to locus
    region_start = hl.Locus.parse(chrom+':'+str(start), reference_genome='GRCh38')
    region_end = hl.Locus.parse(chrom+':'+str(end), reference_genome='GRCh38')
    # Filter to the specified region
    mt = mt.filter_rows(
    (mt.locus >= region_start) & (mt.locus <= region_end)
    , )
    if mt.count_rows()>0:
        print('rows present in '+str(chrom))
        db = hl.experimental.DB(region='us', cloud='aws')
        annotated_mt = db.annotate_rows_db(mt, 'dbNSFP_variants')
        aapos_set = hl.set(aapositions_list)
        filtered_mt = annotated_mt.filter_rows(
            (hl.is_defined(annotated_mt.dbNSFP_variants) &
            hl.any(lambda x: aapos_set.contains(x.aapos),annotated_mt.dbNSFP_variants))
        )
        if filtered_mt.count_rows()>0:
            rows_table = filtered_mt.rows()
            exploded_table = rows_table.explode(rows_table.dbNSFP_variants)
            flattened_table = exploded_table.transmute(**exploded_table.dbNSFP_variants)
        else:
            flattened_table=mt.rows()
        break

However this code constantly errors out with what I believe is a memory error:
Error summary: ClassNotFoundException: is.hail.backend.spark.SparkBackend$$anon$5$RDDPartition

or when it doesn’t do this- it proves extremely difficult to explore as I try to export the results to a file- which can take a long time and this also errors out frequently.
I’m hoping that there is something simple in my code that can be spotted to resolve the slowness? Increasing my memory request does not seem to improve things…

Hi @c.clarkson,
That ClassNotFoundException shouldn’t be a memory error and is more likely an installation/environment issue. We’re limited to using Java 8 and 11 only - can you verify that the version of java you’re using?
Best,
Ed

thanks for getting back to me:

!java --version
openjdk 11.0.13 2021-10-19

That seems fine. How did you install hail? What version are you using?

0.2 is the version that I am using.

It is very strange that sometimes the code takes ~60 minutes but if I stop the code and rerun it gets through. That error only occurred once.
Also then exporting to a file to take a look at the output takes a really long time. mt.show() takes even longer…
just looking for an explanation as to why this takes so long? Is there anything I can do to speed it up?

could it be to do with the fact that I am not caching these queries?
Upon doing a bit of research I have found that caching the data: mt.cache() is possible
Still though the process is incredibly slow for something I would have thought was simple…?

I needed to restart my hail session- that solved the error
The fact that this process is extremely slow- just to get some simple information, is still a problem

Hi @c.clarkson

There are a couple of things you could do to your code to improve performance.

The thing to note is that hail is a lazy language - it won’t materialise results until one is actually requested. For example, operations like filter and annotate defer computation whereas count and import and write materialise results.

A general rule of thumb is that:

  • hail types like MatrixTable represent deferred computations, and
  • operations thereon that return vanilla python values (or None) materialise results.

What that means for you: lines of code like the following are problematic and perform everything above twice:

...
mt = mt.filter_rows( ... )
if mt.count_rows() > 0:
   mt = # operations using mt include entire computation

The solution is to use checkpoint to perform all computation up to that point and make that (matrix)table available to more than on place:

...
mt = mt.filter_rows( ... )
mt = mt.checkpoint('checkpoint/filter_rows.mt')
if mt.count_rows() > 0:
   mt = # operations using mt do not include previous computation

I wouldn’t use cache - I’m pretty sure it’s broken.

Finally, in your code snippet I don’t see what you’re doing with the matrix tables at the end of the loop - are you writing flattened_table somewhere but just omitted from your snippet?

Hi
Thanks for being so patient with my unclear question.
Yes I’d like to output the flattened table to a tsv file.
This is because I’d like to inspect the output to further see what I can filter on… I recognise that this may not be the best way to explore this package’s capabilities but the command mt.show()
takes so long that I though exporting straight to a file might be better?
I should mention that I am ultimately intending to annotate the filtered vcf with more than just dbNSFP_variants.
I want to filter based on features in a large list of annotations e.g.: annotated_mt = db.annotate_rows_db(mt, 'CADD', 'clinvar_gene_summary','dbNSFP_genes', 'dbNSFP_variants', 'dbSNP_rsid', 'gnomad_exome_sites')

I have updated my script as below:

def get_vars(loc,muts):
    chrom,start,end=loc.replace(':',' ').replace('-',' ').split(' ')
    # aref,apos=loc.split(';')[1].replace(':',' ').split(' ')
    muts_list=[str(mut)+';'+str(mut) for mut in muts]
    chrom_files=get_chroms(chrom)
    # print(chrom_files)
    for chrom_file in chrom_files:
        chrom_file = 'file://'+chrom_file
        mt = hl.import_vcf(chrom_file,
        force_bgz=True,
        reference_genome="GRCh38",
        array_elements_required=False)
        # filtering to locus
        region_start = hl.Locus.parse(chrom+':'+str(start), reference_genome='GRCh38')
        region_end = hl.Locus.parse(chrom+':'+str(end), reference_genome='GRCh38')
        # Filter to the specified region
        mt = mt.filter_rows(
        (mt.locus >= region_start) & (mt.locus <= region_end)
        , )
        mt = mt.checkpoint('checkpoint/filter_rows.mt')
        if mt.count_rows()>0:
            print('rows present in '+str(chrom))
            db = hl.experimental.DB(region='us', cloud='aws')
            annotated_mt = db.annotate_rows_db(mt, 'CADD', 'clinvar_gene_summary','dbNSFP_genes', 'dbNSFP_variants', 'dbSNP_rsid', 'gnomad_exome_sites')
            aapos_set = hl.set(muts_list)
            filtered_mt = annotated_mt.filter_rows(
                (hl.is_defined(annotated_mt.dbNSFP_variants) &
                hl.any(lambda x: aapos_set.contains(x.aapos),annotated_mt.dbNSFP_variants))
            )
            if filtered_mt.count_rows()>0:
                rows_table = filtered_mt.rows()
                exploded_table = rows_table.explode(rows_table.dbNSFP_variants)
                flattened_table = exploded_table.transmute(**exploded_table.dbNSFP_variants)
            else:
                flattened_table=mt.rows()
            break
    return flattened_table

Having run it again- it is still taking a really long time which varies between rounds of restarting and running the code…
Is that normal? All I really want is to get the allele frequency of amino acid changing variants- which I would have thought was a simple thing- but it is proving really difficult?

Hi @c.clarkson,

Something doesn’t sound right if exporting to tsv is faster then show

How large are these VCFs, generally? Do you know the range of loci? Importing VCFs can be quite an expensive operation, depending on how large the file is. Furthermore, since they’re not indexed, we need to scan through the entire file until we reach the loci of interest (your region start and end). If you converted your VCFs into MatrixTables and saved them to disk, we’d do an index lookup on these sites and you’d get a good speed-up between runs.

Also suggestion for using checkpoint was slightly broader - you should modify the following lines too to avoid repeated joins with the annotation databases.

filtered_mt = annotated_mt.filter_rows( ... )
if filtered_mt.count_rows()>0:
  rows_table = filtered_mt.rows()

Best,
Ed