The error occurs when using multiprocessing only
Here’s the code:
import os
import findspark
import hail
findspark.init()
import hail as hl
from pyspark import SparkConf
import argparse, sys
from multiprocessing import Pool
import threading
from datetime import datetime
os.environ['PYSPARK_SUBMIT_ARGS'] = "--driver-memory 8g --executor-memory 8g pyspark-shell"
now = str(datetime.now()).replace(" ", "_")
logPath = "LOGS/" + "hail-" + now + ".log"
hl.init(log=logPath)
parser=argparse.ArgumentParser()
parser.add_argument('--file_req', help='Enter the file with requests')
parser.add_argument('--ofile', help="Path to output file")
parser.add_argument('--r_folder', help="Folder with preliminary hail reports")
parser.add_argument("--MTX_folder", help="Folder with mtxes")
parser.add_argument("--mtx_template", help="Template of how mtx filename looks like")
parser.add_argument("--no_proc", help="Number of processes")
parser.add_argument("--max_genomes", help="Here you decide, which lines are going to be shown. The idea is, if there are more genomes than the threshold, those won't be shown")
args=parser.parse_args()
fileWithRequests = args.file_req
fileOverAll = args.ofile #"reportVCFAnalysis.tsv"
pathToMTXFolderAndTemplate = args.MTX_folder + "/" + args.mtx_template #"../MTX2/mtx_from_chr"
fOpen = open(fileOverAll, 'w')
linesHetero = []
linesHomo = []
reports = args.r_folder
if not os.path.exists(reports):
os.makedirs(reports)
f = open(fileWithRequests)
linesF = f.readlines()
hetero = 0
homo = 0
searchIDorLOC = []
IDs = []
IDXs = []
IDYs = []
listFilesX = []
listFilesY = []
maxGenomes = int(args.max_genomes)
def funMP(line):
print("LINE IS", line)
if line.startswith("rs"):
#print("RSID")
searchIDorLOC.append(line.strip())
i = 1
while i < 25:
j = i
if i == 23:
j = "X"
elif i == 24:
j = "Y"
#print("CHECK", pathToMTXFolderAndTemplate)
mtx = hl.read_matrix_table(pathToMTXFolderAndTemplate + str(j) + ".mt")
n = mtx.filter_rows(mtx.rsid == line.strip()).count_rows()
newMtx = mtx.filter_rows(mtx.rsid == line.strip())
fileName = "tsvOutput_" + line.strip() + ".tsv"
fPath = os.path.join(reports, fileName)
#print("CHECK2", fPath, reports)
if n != 0:
newMtx.GT.export(fPath)
i = i + 1
else:
print("ELSE")
searchIDorLOC.append(line.strip())
path = pathToMTXFolderAndTemplate + str(line.strip().split(":")[0].strip()) + ".mt"
print("path", path)
mtx = hl.read_matrix_table(pathToMTXFolderAndTemplate + str(line.strip().split(":")[0].strip()) + ".mt")
newMtx = hl.filter_intervals(mtx, [hl.parse_locus_interval(line.strip(), reference_genome='GRCh37')])
n = newMtx.count_rows()
newMtx = newMtx.key_rows_by(*newMtx.row_key, 'rsid')
#print("CHECK N", n)
#newMtx.rsid.show()
fileName = "tsvOutput_" + str(line.strip()).replace(":", "-") + ".tsv"
fPath = os.path.join(reports, fileName)
print("FPATH IS", fPath)
if n != 0:
print("CREATING NEWMTX", type(newMtx))
newMtx.GT.export(fPath)
'''for l in linesF:
funMP(l)'''
with Pool(int(args.no_proc)) as p:
p.map(funMP, linesF)#'''
for file in os.listdir(reports):
##print(file)
rsid = file.split("_")[-1].split(".")[0]
if file.endswith(".tsv"):
file = reports + file
idxGenomes = 0
line1 = rsid + "\tnumber of heterozygous\t"
line2 = rsid + "\tnumber of homozygous\t"
line3 = ""
line4 = ""
line1middle = ""
line2middle = ""
genomes = []
nucleotides = ""
x = 0
y = 0
if "rs" in file:
with open(file) as fP:
for line in fP:
lineList = line.strip().split("\t")
if "locus" in lineList:
idxGenomes = lineList.index('alleles')
genomes = lineList[idxGenomes+1:]
else:
t = 0
for i in lineList[idxGenomes+1:]:
if "1" and "0" in i:
line1middle = line1middle + " " + genomes[t]
x = x + 1
elif "1/1" in i:
nucleotides = lineList[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ")
line2middle = line2middle + " " + genomes[t]
y = y + 1
t = t + 1
line1 = line1 + "\t" + str(x) + "\t" + line1middle.strip() + "\n"
line2 = line2 + "\t" + str(y) + "\t" + nucleotides + "\t" + line2middle.strip() + "\n"
if x != 0:
linesHetero.append(line1)
if y != 0:
linesHomo.append(line2)
else:
#print("FILE NON RS", file)
with open(file) as f:
genomes = []
for line in f:
x = 0
y = 0
line1middle = ""
line2middle = ""
if "locus" in line:
idx = line.strip().split("\t").index("alleles")
#print(idx)
#print("L in line", line)
genomes = line.strip().split("\t")[idx+1:]
else:
#print("LINE WITH DATA", line)
line3 = rsid + " " + line.split("\t")[2] + "\tnumber of heterozygous\t"#+
line4 = rsid + " " + line.split("\t")[2] + "\tnumber of homozygous\t" #+ line.split("\t")[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ")
#print(line3)
#print(line4)
arrGenomes = line.strip().split("\t")[2:]
#print(arrGenomes)
t = 0
for i in arrGenomes:
if "0/1" in i or "1/0" in i:
#print("BOTH", i)
x = x + 1
#print(t)
#print(genomes)
#print(genomes[t])
line1middle = line1middle + " " + genomes[t]
#print("LM", line1middle)
elif "1/1" in i:
line2middle = line2middle + " " + genomes[t]
y = y + 1
t = t + 1
#print(x)
#print(y)
#print("LM1", line1middle)
line3 = line3 + "\t" + str(x) + "\t" +line.split("\t")[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ").strip() + "\t" + line1middle.strip() + "\n"
#print(line3)
line4 = line4 + "\t" + str(y) + "\t" +line.split("\t")[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ").strip() + "\t" + line2middle.strip() + "\n"
#print(line4)
if x != 0:
linesHetero.append(line3)
if y != 0:
linesHomo.append(line4)
for i in sorted(list(set(linesHetero))):
if len(i.split("\t")[5].split(" ")) < maxGenomes:
fOpen.write(i)
fOpen.write("\n\n\n\n\n")
for j in sorted(list(set(linesHomo))):
if len(j.split("\t")[5].split(" ")) < maxGenomes:
fOpen.write(j)
Here’s the error:
python3 mtx2rep2.py --file_req Search1.tsv --ofile repTest4.tsv --r_folder REPORTS_TEST8/ --MTX_folder ../MTX2 --mtx_template mtx_from_chr --no_proc 2 --max_genomes 4
2022-03-23 02:26:33 WARN Utils:69 - Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 172.16.162.133 instead (on interface ens33)
2022-03-23 02:26:33 WARN Utils:69 - Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/ani/.local/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2022-03-23 02:26:34 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-03-23 02:26:34 WARN Hail:43 - This Hail JAR was compiled for Spark 3.1.2, running with Spark 3.1.3.
Compatibility is not guaranteed.
Running on Apache Spark version 3.1.3
SparkUI available at http://172.16.162.133:4040
Welcome to
__ __ <>__
/ /_/ /__ __/ /
/ __ / _ `/ / /
/_/ /_/\_,_/_/_/ version 0.2.91-44b441376f9a
LOGGING: writing to LOGS/hail-2022-03-23_02:26:32.253557.log
LINE IS 1:500000-737000
ELSE
path ../MTX2/mtx_from_chr1.mt
LINE IS rs12028261
LINE IS rs3131984
LINE IS rs4951928
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
return list(map(*args))
File "/home/ani/Documents/TRUNC/TRUNC/TESTS/mtx2rep2.py", line 64, in funMP
mtx = hl.read_matrix_table(pathToMTXFolderAndTemplate + str(j) + ".mt")
File "<decorator-gen-1352>", line 2, in read_matrix_table
File "/home/ani/.local/lib/python3.9/site-packages/hail/typecheck/check.py", line 577, in wrapper
return __original_func(*args_, **kwargs_)
File "/home/ani/.local/lib/python3.9/site-packages/hail/methods/impex.py", line 2126, in read_matrix_table
for rg_config in Env.backend().load_references_from_dataset(path):
File "/home/ani/.local/lib/python3.9/site-packages/hail/backend/spark_backend.py", line 328, in load_references_from_dataset
return json.loads(self.hail_package().variant.ReferenceGenome.fromHailDataset(self.fs._jfs, path))
File "/home/ani/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1644, in __getattr__
raise Py4JError("{0} does not exist in the JVM".format(new_fqn))
py4j.protocol.Py4JError: is.hail.variant does not exist in the JVM
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ani/Documents/TRUNC/TRUNC/TESTS/mtx2rep2.py", line 97, in <module>
p.map(funMP, linesF)#'''
File "/usr/lib/python3.9/multiprocessing/pool.py", line 364, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
raise self._value
py4j.protocol.Py4JError: is.hail.variant does not exist in the JVM
Interestingly, that doesn’t occur on Mac, but occurs on Cent OS and Ubuntu.