Hello - I’m benchmarking a gwas script, which I intend to then parallelize using slurm. I’m benchmarking 4 jobs with identical resources. One job typically works fine, but they others fail. Below is the stacktrace:
Running on Apache Spark version 2.4.1
SparkUI available at http://cn1735:4040
Welcome to
__ __ <>__
/ /_/ /__ __/ /
/ __ / _ `/ / /
/_/ /_/\_,_/_/_/ version 0.2.64-1ef70187dc78
LOGGING: writing to /data/NIMH_scratch/dmoracze/gwas/logs/gwas_idx0_step24.log
2022-01-19 13:13:52 Hail: INFO: Number of BGEN files parsed: 22
2022-01-19 13:13:52 Hail: INFO: Number of samples in BGEN files: 487409
2022-01-19 13:13:52 Hail: INFO: Number of variants across all BGEN files: 93095623
2022-01-19 13:16:19 Hail: INFO: Coerced sorted dataset
2022-01-19 13:16:22 Hail: INFO: linear_regression_rows: running on 11224 samples for 24 response variables y,
with input variable x, and 1 additional covariate...
ERROR:root:Exception while sending command.
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1159, in send_command
raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 985, in send_command
response = connection.send_command(command)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1164, in send_command
"Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
Traceback (most recent call last):
File "/data/Ukbb_DSST/DSST/dmoracze/gwas/ukbb_idp_gwas.py", line 88, in <module>
Main()
File "/data/Ukbb_DSST/DSST/dmoracze/gwas/ukbb_idp_gwas.py", line 84, in Main
hl_wf(settings)
File "/data/Ukbb_DSST/DSST/dmoracze/gwas/ukbb_idp_gwas.py", line 60, in hl_wf
block_size=128)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/decorator.py", line 232, in fun
return caller(func, *(extras + args), **kw)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/hail/typecheck/check.py", line 577, in wrapper
return __original_func(*args_, **kwargs_)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/hail/methods/statgen.py", line 370, in linear_regression_rows
return ht_result.persist()
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/decorator.py", line 232, in fun
return caller(func, *(extras + args), **kw)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/hail/typecheck/check.py", line 577, in wrapper
return __original_func(*args_, **kwargs_)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/hail/table.py", line 1870, in persist
return Env.backend().persist_table(self, storage_level)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/hail/backend/spark_backend.py", line 286, in persist_table
return Table._from_java(self._jbackend.pyPersistTable(storage_level, self._to_java_table_ir(t._tir)))
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/hail/backend/py4j_backend.py", line 16, in deco
return f(*args, **kwargs)
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/protocol.py", line 336, in get_return_value
format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o1.pyPersistTable
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35600)
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35600)
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35600)
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35600)
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 929, in _get_connection
connection = self.deque.pop()
IndexError: pop from an empty deque
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/dmoracze/miniconda3/lib/python3.7/site-packages/py4j/java_gateway.py", line 1067, in start
self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused
Do you have an idea of what might be causing this issue?
Here is my code, adapted from recommendations in this thread.
import hail as hl
import pandas as pd
import os, sys
from glob import glob
def hl_wf(settings):
hl.init(spark_conf={'spark.driver.memory': '128g'},
tmp_dir=f'/lscratch/{os.environ["SLURM_JOB_ID"]}',
log=f'/data/NIMH_scratch/dmoracze/gwas/logs/gwas_idx{settings["idx"]}_step{settings["step"]}.log')
# get bgen files together
top_dir = '/data/Ukbb_DSST/genetic/ukbgene'
bgenFiles = glob(f'{top_dir}/bgen/ukb_imp_chr*_v3.bgen')
bgen22Files = bgenFiles[0:22]
sampleFile = f'{top_dir}/ukb22875_imp_chr1_22_v3_s487320.sample' # should be same for all chromosomes
mfi_ht = '/data/Ukbb_DSST/genetic/ukbgene/mfi_new/gwas_mfi.ht'
# subject list
subjsFile = '/data/Ukbb_DSST/DSST/dmoracze/gwas/good_subjs.csv'
with open(subjsFile) as f:
subjs = f.read().split('\n')
# read in mfi files, use GRCh37 as reference
mfi_info = hl.read_table(mfi_ht)
mfi_info = mfi_info.key_by(locus=mfi_info.locus,alleles = mfi_info.alleles)
# read in the bgen files
genotype_info = hl.import_bgen(bgen22Files, entry_fields=['dosage'], sample_file=sampleFile, n_partitions=18000)
# add mfi to variants
geno_mt = genotype_info.annotate_rows(mfi=mfi_info[genotype_info.row_key])
geno_mt = geno_mt.filter_rows(geno_mt.mfi.info>0.1)
geno_subset_mt = geno_mt.filter_cols(hl.literal(subjs).contains(geno_mt.s))
# just do 1 row for now
batch_size = settings['step']
batch_index = settings['idx']
# read in the pheno matrix table
phenos_mt = hl.read_matrix_table('/data/Ukbb_DSST/DSST/dmoracze/gwas/idps_i_deconf.mt')
phenos_mt = phenos_mt.add_col_index()
pheno_start = batch_size * batch_index
pheno_end = pheno_start + batch_size
phenos_mt = phenos_mt.filter_cols((phenos_mt.col_idx >= pheno_start) & (phenos_mt.col_idx < pheno_end))
phenos_table = phenos_mt.annotate_rows(phenos = hl.agg.collect(phenos_mt.x)).rows()
geno_subset_mt = geno_subset_mt.annotate_cols(phenos = phenos_table[geno_subset_mt.s].phenos)
# do the gwas
batch_results = hl.linear_regression_rows(
[geno_subset_mt.phenos[i] for i in range(batch_size)],
geno_subset_mt.dosage,
covariates=[1.0],
pass_through=[geno_subset_mt.rsid, geno_subset_mt.mfi.chrom],
block_size=128)
# save the results
res = batch_results.select('rsid', 'beta')
res = res.filter(hl.is_nan(res.beta[0]), keep=False)
res.export(f'/data/NIMH_scratch/dmoracze/gwas/tsvs/gwas_betas_idx{settings["idx"]}_step{settings["step"]}.tsv.bgz')
hl.stop()
def build_parser():
parser = argparse.ArgumentParser(description=__doc__,add_help = True)
parser.add_argument('idx', action='store', type=int,
help='''Index to subset IPD table.''')
parser.add_argument('step', action='store', type=int,
help='''How many IDPs do you want?''')
return parser
def Main():
parser = build_parser()
args = parser.parse_args()
settings = vars(args)
hl_wf(settings)
if __name__ == '__main__':
Main()