Linear regression hanging - help needed

The problem

I am running a linear regression as the last step in a burden testing algorithm, and it runs normally through a large computation (judging by the numbers describing the stages and the job’s progress through them, e.g. [x + y] / 2586), then it hangs in what appears to be a collect step of the linear regression (again, judging by the number of stages – the progress indicator says [0 + 1] / 17).

When it hangs, memory and CPU drop, and it just runs indefinitely, without returning results or crashing. This same code has worked before, on the same dataset, but stopped working after I added the gnomAD annotations (even after I dropped them post-filtering). I will post more detailed output after rerunning the job – I didn’t capture the original output.

Details below. Thanks!

The method

  1. Using a large MatrixTable of variants (rows) and genomes (columns)
  2. Adding the gnomAD genome sites annotations to the rows
  3. Filtering for rare variants and by “most severe consequence” using the gnomAD annotations, then drop the annotations to keep row sizes manageable
  4. Annotating with HGDP data to get gene symbols, grouping by those symbols, and filtering out those without any variants or gene symbol
  5. Adding covariates (age, sex, ancestry), primary components, and height to the burden table
  6. Running a linear regression

The code

This isn’t the full source code, but just the relevant pieces:

genomes = hl.read_matrix_table('<my_matrix_table>')
genomes = genomes.key_rows_by(genomes.locus, genomes.alleles)
db = hl.experimental.DB(region='us', cloud='gcp')  
genomes = db.annotate_rows_db( genomes, 'gnomad_genome_sites' ) 
genomes = genomes.filter_rows(hl.agg.call_stats(genomes.GT, genomes.alleles).AF[1] < 0.01)
genomes = genomes.filter_rows( (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'start_lost') |\
                               (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'stop_gained') |\
                               (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'splice_donor_variant') |\
                               (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'splice_acceptor_variant') )
genomes = genomes.drop( 'gnomad_genome_sites' )

...

burden = (
    genomes
    .group_rows_by(genomes.vep_info.gene_symbol)
    .aggregate(n_variants = hl.agg.count_where(genomes.GT.n_alt_alleles() > 0))
)
burden = burden.filter_rows( hl.agg.sum( burden.n_variants ) > 0 )
burden = burden.filter_rows( hl.is_missing(burden.gene_symbol), keep=False )

...
covariates = hl.import_table('gs://my_bucket/phenotype.csv', impute=True, delimiter=',').key_by('shipping_id')
covariates = covariates.annotate(sex_binary = hl.if_else( covariates.sex == 'M', 0, 1 ))
covariates = covariates.select(covariates.age, covariates.sex_binary, covariates.hare)
burden = burden.annotate_cols(cov = covariates[burden.s])

...

burden = burden.annotate_cols(pca = pca[burden.s])
burden = burden.annotate_cols(height = height[burden.s])

...

burden_results = hl.linear_regression_rows(
    y=burden.height.Height,
    x=burden.n_variants,
    covariates=[1.0,
                burden.cov.sex_binary,
                burden.pca.PC1,
                burden.pca.PC2,
                burden.pca.PC3])

Software versions

Hail 1.2
Dataproc cluster (1 master/2 workers/up do 1000 secondary workers)
gnomAD genome sites annotations (v3.1.x, I believe - Hail doesn’t tell you which version it’s fetching)

Changing the row key of a matrix table is a rather expensive operation. Is the MT not stored with locus & alleles as the key?

You might see some benefit from mt = mt.checkpoint(...) after the burden creation. That step dramatically reduces the amount of data so storing the result is not expensive and can help isolate memory intensive parts of the pipeline form one another.

Where does covariates come from? If it is coming from a text table, write and read it before you join it. Hail works most efficiently from its own native formats.

Are you sure it hangs on the linear regression rows and not on a step after that?

Hail 1.2 isn’t a Hail version, what does hl.version() return?

Hail sorts by the version string and takes the largest value. It’d be good to print that version though. If you’re using GRCh38, you should get 3.1.2.

Changing the row key of a matrix table is a rather expensive operation. Is the MT not stored with locus & alleles as the key?

>>> genomes.row_key # right after importing MT
<StructExpression of type struct{locus: locus<GRCh38>, alleles: array<str>}>

So maybe I can take out the key_rows_by(...) line.

You might see some benefit from mt = mt.checkpoint(...) after the burden creation.

I’ll try that. I’m assuming you mean after the two filtering steps after the initial creation; let me know if you meant right after the creation.

Where does covariates come from?

It’s imported before those lines I cited:

covariates = hl.import_table('gs://my_bucket/phenotype.csv', impute=True, delimiter=',').key_by('shipping_id')

Do you mean I should read the CSV file in once and write it out to a Hail Table, then use that HT from now on?

Are you sure it hangs on the linear regression rows and not on a step after that?

I’m not totally sure about that. I had the script print out each line just before running it, and the last line it printed out was the call to the linear regression algorithm.

The output looks like this:

burden_results = hl.linear_regression_rows(...

[Stage 13:>                                                         (0 + 1) / 1]

[Stage 15:>                                                         (0 + 1) / 1]

[Stage 16:>                                                      (0 + 8) / 2586]

...

[Stage 16:===================================================>(2585 + 1) / 2586]

[Stage 17:>                                                         (0 + 1) / 1]

Stage 17 is where it hangs. It prints that same line out hundreds of times without incrementing the numbers or the progress bar. There is a line after that, but I’m not sure that’s the one it’s hanging on (debugging tips appreciated, if there’s a way to get more detailed info). The line after:

burden_results.order_by(burden_results.p_value).show()

Hail 1.2 isn’t a Hail version, what does hl.version() return?
Typo. I meant Hail 0.2. More precisely:

>>> hl.version()
'0.2.112-31ceff2fb5fd'

Hail sorts by the version string and takes the largest value. It’d be good to print that version though. If you’re using GRCh38, you should get 3.1.2.
What’s the version function for annotations? I couldn’t find anything in the docs or the structure itself, but I’d assume it’s whatever Hail defaults to (3.1.2, as you said).

A few things I forgot to mention:

  • genomes table size: 6.91 TB, 18.5M rows, 10K cols
  • machine types:
    • master: n1-highmem-8, 100 GB pd-standard primary disk, no GPUs or SSDs
    • primary workers (2): n1-highmem-8, 1000 GB pd-standard primary disk, no GPUs or SSDs
    • secondary workers (up to 1000 allowed, 85 used for last run): n1-highmem-8, 1000 GB pd-standard primary disk, no GPUs or SSDs

Thanks again,
Daniel

How can I get more granular debugging information? As in, what operation is taking place when it hangs? Just seeing the progress of the stage is not super helpful when it hangs like this…

You almost certainly don’t need large disks on the workers. Hail is designed to leverage blob storage. I think we default to 40GB disks.

Yes, after the definition of the burden variable.

Yes.

This suggests that there’s some operation which is trying to localize the entire dataset onto one machine. That’s not going to work.

There is not really a way to get better information. The Hail log file and the worker logs have information that the Hail team has some experience making sense of. In general, this is the trouble with Apache Spark (the library on which Hail depends): it’s really hard to debug what’s happening at run-time.


We need to determine where you lose all your parallelism and collapse to a single partition. Can you modify the script to, instead of executing linear_regression_rows, running this line:

print(burden.n_partitions())

Can you also likewise do that for any table or matrix table that is involved in the construction of burden_results?

Can you also confirm that you have multiple distinct gene_symbols and that there are not, say, millions of rows with the gene_symbol NA? That’s going to cause issues. Presumably, you don’t care about those variants, so I’d add a filter_rows to variants with a hl.is_defined(genomes.vep_info.gene_symbol).

Is it also possible to share the full script? Nothing you’ve shared seems obviously an issue (except perhaps the gene_symbol) which suggests to me that the issue lies in the ....

import hail as hl
hl.init()
genomes = hl.read_matrix_table('genomes.mt')
db = hl.experimental.DB(region='us', cloud='gcp')
genomes = db.annotate_rows_db( genomes, 'gnomad_genome_sites' )
genomes = genomes.filter_rows(hl.agg.call_stats(genomes.GT, genomes.alleles).AF[1] < 0.01)
genomes = genomes.filter_rows( (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'start_lost') |\
                               (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'stop_gained') |\
                               (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'splice_donor_variant') |\
                               (genomes.gnomad_genome_sites.vep.most_severe_consequence == 'splice_acceptor_variant') )
genomes = genomes.drop( 'gnomad_genome_sites' )
hgdp = hl.import_table('hgdp_gene_annotations.tsv', impute=True)
hgdp_parsed = hl.parse_variant(hgdp.variant, reference_genome='GRCh38')
hgdp = hgdp.key_by(locus = hgdp_parsed.locus, alleles=hgdp_parsed.alleles).drop('variant')
genomes = genomes.annotate_rows(vep_info = hgdp[genomes.locus, genomes.alleles])
burden = (
    genomes
    .group_rows_by(genomes.vep_info.gene_symbol)
    .aggregate(n_variants = hl.agg.count_where(genomes.GT.n_alt_alleles() > 0))
)
burden = burden.filter_rows( hl.agg.sum( burden.n_variants ) > 0 )
burden = burden.filter_rows( hl.is_missing(burden.gene_symbol), keep=False )
burden = burden.checkpoint(output='burden-table-checkpoint.mt')
covariates = hl.import_table('phenotype.csv', impute=True, delimiter=',').key_by('shipping_id')
covariates = covariates.annotate(sex_binary = hl.if_else( covariates.sex == 'M', 0, 1 ))
covariates = covariates.select(covariates.age, covariates.sex_binary, covariates.hare)
burden = burden.annotate_cols(cov = covariates[burden.s])
pca = hl.import_table('covariates.tsv', impute=True, delimiter='\t').key_by('shipping_id')
burden = burden.annotate_cols(pca = pca[burden.s])
height = hl.import_table('height.csv', impute=True, delimiter=',').key_by('shipping_id')
burden = burden.annotate_cols(height = height[burden.s])
burden_results = hl.linear_regression_rows(
    y=burden.height.Height,
    x=burden.n_variants,
    covariates=[1.0,
                burden.cov.sex_binary,
                burden.pca.PC1,
                burden.pca.PC2,
                burden.pca.PC3],
    )
burden_results.order_by(burden_results.p_value).show()

Hi Dan,

Thanks for keeping the suggestions coming. There’s already a line removing rows with NAs as the gene symbol. This amounts to 3,463/10,441 = 33% of the rows in the HGDP table:

burden = burden.filter_rows( hl.is_missing(burden.gene_symbol), keep=False )

However, I moved the filtering up to the step where I import the HGDP table itself, since until now it has been done after the grouping and aggregation steps. If the NA rows are going to be removed anyway, they may as well be removed prior to a costly grouping & aggregation step, right?

The print(burden.n_partitions()) line prints 1, which supports your statement that it is collapsing to a single partition from multiple. The n_partitions() call runs for several minutes, printing the same status message as the linear regression did in the previous run (last line below):

[Stage 16:===================================================>(2585 + 1) / 2586]

Another thing I tried was to add a call to <table>.show() after every table modification to isolate the cause of the hang. (The only problem with this is that I may be introducing inefficiency by forcing Spark to do each stage in the order written, rather than having some license to rearrange for better efficiency.)

Anyway: The step where it is hanging seems to be:

burden = ( genomes.group_rows_by(genomes.vep_info.gene_symbol).aggregate(n_variants = hl.agg.count_where(genomes.GT.n_alt_alleles() > 0)))

Do you think it would help if I broke this multistep operation into several discrete steps?

Thanks,
Daniel

You’ll definitely have a better time filtering NAs before the group rows by, that’s computing a large aggregation only to throw it away.

I’m not sure how you could break that operation up. There’s no way to separate row grouping from aggregation. We have to understand why you have no parallelism after the group_rows_by. How many distinct gene_symbols do you have in genomes?

x = genomes.aggregate_rows(hl.agg.collect_as_set(genomes.vep_info.gene_symbol))
print(len(x))

How many partitions does hgdp have? What about genomes after the read, after the anntotae_rows_db, after the drop, and after the annotate_rows(vep_info ...?

Hi Dan,

It’s been a minute since this issue was live, but I wanted to let you know we got it resolved (and explain how). It turns out the problem was actually that the join from our variants table to the HGDP annotations was resulting in an empty table, and this table was hanging on the group/aggregate and linear regression computations. So it was not that the computation was too large; it was the opposite.

Once I joined to an annotations table with matches, the computation ran fairly quickly. I’ve annotated our variants now with gnomAD and Genebass databases, and the grouping & aggregation and the linear regression steps both ran successfully, so I’m certain it was the HGDP annotations database that was the problem.

I think this is a bug in either Hail or Spark and can log an issue if you want me to.

Thanks,
Daniel Cotter