I’m using Hail to perform a GWAS for some expression QTLs on Google Cloud. The code is 1) performing a series of regressions w/ linreg_multipheno, 2) after each block of phenotypes, joining the variant_table output to the output, and 3) after all regressions, repartitioning to 100 partitions and exporting the resulting key table with parallel=True.
I’m having two problems. First, writing the results is taking much longer than the regressions. For the first run, regressions finished within an hour while writing the results took the next 9 hours. Any ideas about how to speed this up?
Second, while writing the output files, the code threw an out of memory error, pasted below. Only about 750 MiB of key table results were actually written at the time. Possibly relevant is that the export had previously caused a stack overflow, which I fixed by increasing the stack size.
[Stage 774:=====> (2535 + 1226) / 22371]
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007fc4b8b80000, 2197815296, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 2197815296 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /tmp/b5dc094b-b3a6-440a-b5f9-cb5f83b93c56/hs_err_pid4189.log
ERROR: (gcloud.dataproc.jobs.wait) Job [b5dc094b-b3a6-440a-b5f9-cb5f83b93c56] entered state [ERROR] while waiting for [DONE].
Hey @DylanVrana, can you share more of the actual hail code you’re executing?
Are you writing to a Google Storage bucket?
it would be great if we could get that log file, but I assume the cluster has already been shut down.
How big do you expect this results KeyTable to be, roughly?
You might also try writing each individual output to Google Storage, then reading them and joining them and writing the combined thing. We sometimes see that shuffles can cause memory issues, though I’ve never seen this kind of error message from a shuffle.
Here’s a chunk of the code- the last two lines are the most relevant. I am writing to a Google Storage bucket. I expect the KeyTable to be, at most, 2-3 GiB after writing.
# Annotate and filter the vds
vds_block = vds_tissue.annotate_samples_table(junction_reads,root="sa.phenos")
for g in genes: # Performs linear regression on one gene (all of which has the same missingness pattern) and writes it to a table
vds_gene = vds_block.filter_intervals(hail.Interval.parse(coords[g]))
genes[g] = list(genes[g])
n_obs = vds_gene.query_samples('samples.map(s => %s).filter(x => x.isDefined()).count()' % genes[g][0])
if n_obs <= n_obs_filter: continue
vds_gene = vds_gene.linreg_multi_pheno(genes[g],min_ac=2,min_af=0.01)
# Annotate with n and junction names
vds_gene = vds_gene.annotate_global('global.n_obs',n_obs,hail.TLong()) \
.annotate_global('global.junctions',genes[g],hail.TArray(hail.TString())) \
.annotate_variants_expr('va.n_obs = global.n_obs') \
.annotate_variants_expr('va.junctions = global.junctions')
vt = vds_gene.variants_table().flatten()
vt_rename = { 'EffectAllele' : 'EffectAllele', 'va.rsid' : 'SNP', 'linreg_all' : 'linreg.all', 'va.n_obs' : 'n_obs', 'linreg_all.beta' : 'beta', 'linreg_all.p' : 'p', 'linreg_all.junction' : 'Junction' }
vt = vt.annotate('linreg_all = range(`va.linreg.pval`.length()).map(i => {p: `va.linreg.pval`[i], beta: `va.linreg.beta`[i], junction: `va.junctions`[i].split("\`")[1]})') \
.annotate('EffectAllele = if (v.isBiallelic()) v.altAllele().alt else "muliallelic"')
vt = vt.drop(filter(lambda c : c not in vt_rename, vt.columns)).explode(['linreg_all']).flatten().rename(vt_rename)
vt = vt.filter('!isnan(beta) && p < %s' % p_filter) # Remove rows w/ nan correlation and uninteresting p-values
if res == None: res = vt
else: res = res.union(vt)
res.repartition(100)
res.export(outFile.replace('*',t),parallel=True)
@danking, writing once per gene (or block of genes) is currently working, so this is resolved, although I’m still curious about why this error happened.
Tim thinks that too many things were being “broadcast” (a technical Spark term) at once, in particular the phenotype information. By writing you forced these into separate steps thus decreasing total memory usage