Final task/partition is hanging

Hi Hail team,

I am running this code:

import argparse
import logging
from collections import Counter

import hail as hl
from gnomad_hail import *
from gnomad_hail.utils import *
from generate_frequency_data import *

from sample_qc import *

logging.basicConfig(level=logging.INFO, format=’%(asctime)s: %(message)s’, datefmt=’%m/%d/%Y %I:%M:%S %p’)
logger = logging.getLogger(“gnomAD_frequency_data”)

def main(args):
hl.init(log=’/frequency_data_generation.log’)"Importing split sparse MT and metadata table...")
mt = get_full_mt()
meta_ht = hl.read_table("gs://gnomad/metadata/genomes_v3/")

if args.test:"Filtering to chr20")
    mt = hl.filter_intervals(mt, [hl.parse_locus_interval('chr20', reference_genome=f'GRCh38')])"Annotating split sparse MT with metadata...")
mt = mt.annotate_cols(meta=hl.struct(pop=meta_ht[mt.s].pop,
                      )"Selecting only entry fields needed for densify...")
mt = mt.select_entries('GT', 'GQ', 'AD', 'DP', 'END')"Densify-ing...")
mt = hl.experimental.densify(mt)
mt = mt.filter_rows(hl.len(mt.alleles) > 1)"Annotating genotypes with adj...")
mt = annotate_adj(mt)"Adjusting male genotypes at non-PAR...")
mt = adjust_sex_ploidy(mt,"Generating frequency data...")
mt = mt.filter_cols((hl.len(mt.meta.sample_filters) == 0) & mt.meta.releasable)
mt = mt.filter_rows(hl.agg.sum(mt.GT.is_non_ref()) > 0)
ht, sample_table = generate_frequency_data(mt)"Writing out frequency data...")
ht.write("gs://gnomad-mwilson/gnomad_freq/", overwrite=args.overwrite)

if name == “main”:
parser = argparse.ArgumentParser()
parser.add_argument(’–test’, help=‘Test on chr20’, action=‘store_true’)
parser.add_argument(’–overwrite’, help=‘Overwrites existing files’, action=‘store_true’)

I am submitting it with the test arg so the code subsets a split sparse MT to chr20 before continuing. I see long running jobs and lost workers before it even gets to the densify step. I am running this on 200 highmem-8s preemptibles workers with a 500GB master and two 500GB workers.There are a large number of failing tasks and then lost nodes. I am using an autoscaling policy as well.

. Spark still says the task is running.

Have you run this multiple times and keep getting the same thing? Sometimes you just get a freak slow node. One thing to try is to turn on speculation using --properties 'spark.speculation=true' on job submission (@tpoterba - might be time to turn that on by default?).

Which stage was it that was failing? (Can you expand the +details). If it’s the final write of the frequency data, that’s a good sign (if it’s something back in densify, might be tougher).

Probably not the issue, but just as an aside, every time you have meta_ht[mt.s] it’s doing a separate join - storing that as a python variable (meta_join = meta_ht[mt.s]) and then putting that in the struct above will be a bit more efficient.

I did run into a very slow task in an earlier step, I think just reading in a partition. All tasks completed in under 10 minutes and one step hung for about an hour before completing. But this is the first time I’ve run the code so maybe it is an incredibly slow node. Expanding details: 50%20AM . This ran overnight like this so I’m going to kill it and restart it with the spark flag and the more efficient join you suggested.

Awesome! That’s the final write step, definitely just try again (with speculation for good measure).

Excellent, thanks Konrad!