py4j.protocol.Py4JError: is.hail.variant does not exist in the JVM

The error occurs when using multiprocessing only

Here’s the code:

import os
import findspark
import hail

findspark.init()
import hail as hl
from pyspark import SparkConf

import argparse, sys
from multiprocessing import Pool
import threading
from datetime import datetime


os.environ['PYSPARK_SUBMIT_ARGS'] = "--driver-memory 8g --executor-memory 8g pyspark-shell"
now = str(datetime.now()).replace(" ", "_")
logPath = "LOGS/" + "hail-" + now + ".log"
hl.init(log=logPath)
parser=argparse.ArgumentParser()
parser.add_argument('--file_req', help='Enter the file with requests')
parser.add_argument('--ofile', help="Path to output file")
parser.add_argument('--r_folder', help="Folder with preliminary hail reports")
parser.add_argument("--MTX_folder", help="Folder with mtxes")
parser.add_argument("--mtx_template", help="Template of how mtx filename looks like")
parser.add_argument("--no_proc", help="Number of processes")
parser.add_argument("--max_genomes", help="Here you decide, which lines are going to be shown. The idea is, if there are more genomes than the threshold, those won't be shown")
args=parser.parse_args()

fileWithRequests = args.file_req
fileOverAll = args.ofile  #"reportVCFAnalysis.tsv"
pathToMTXFolderAndTemplate = args.MTX_folder + "/" + args.mtx_template       #"../MTX2/mtx_from_chr"
fOpen = open(fileOverAll, 'w')
linesHetero = []
linesHomo = []
reports = args.r_folder
if not os.path.exists(reports):
        os.makedirs(reports)

f = open(fileWithRequests)
linesF = f.readlines()
hetero = 0
homo = 0
searchIDorLOC = []
IDs = []
IDXs = []
IDYs = []
listFilesX = []
listFilesY = []

maxGenomes = int(args.max_genomes)
def funMP(line):
        print("LINE IS", line)
        if line.startswith("rs"):
            #print("RSID")
            searchIDorLOC.append(line.strip())
            i = 1
            while i < 25:
                j = i
                if i == 23:
                        j = "X"
                elif i == 24:
                        j = "Y"
                #print("CHECK", pathToMTXFolderAndTemplate)
                mtx = hl.read_matrix_table(pathToMTXFolderAndTemplate + str(j) + ".mt")
                n = mtx.filter_rows(mtx.rsid == line.strip()).count_rows()
                newMtx = mtx.filter_rows(mtx.rsid == line.strip())
                fileName = "tsvOutput_" + line.strip() + ".tsv"
                fPath = os.path.join(reports, fileName)
                #print("CHECK2", fPath, reports)
                if n != 0:
                        newMtx.GT.export(fPath)
                i = i + 1

        else:
            print("ELSE")
            searchIDorLOC.append(line.strip())
            path = pathToMTXFolderAndTemplate + str(line.strip().split(":")[0].strip()) + ".mt"
            print("path", path)

            mtx = hl.read_matrix_table(pathToMTXFolderAndTemplate + str(line.strip().split(":")[0].strip()) + ".mt")
            newMtx = hl.filter_intervals(mtx, [hl.parse_locus_interval(line.strip(), reference_genome='GRCh37')])
            n = newMtx.count_rows()
            newMtx = newMtx.key_rows_by(*newMtx.row_key, 'rsid')
            #print("CHECK N", n)
            #newMtx.rsid.show()
            fileName = "tsvOutput_" + str(line.strip()).replace(":", "-") + ".tsv"
            fPath = os.path.join(reports, fileName)
            print("FPATH IS", fPath)

            if n != 0:
                print("CREATING NEWMTX", type(newMtx))
                newMtx.GT.export(fPath)
'''for l in linesF:
    funMP(l)'''

with Pool(int(args.no_proc)) as p:
        p.map(funMP, linesF)#'''

for file in os.listdir(reports):
        ##print(file)
        rsid = file.split("_")[-1].split(".")[0]
        if file.endswith(".tsv"):
                file = reports + file
                idxGenomes = 0
                line1 = rsid + "\tnumber of heterozygous\t"
                line2 = rsid + "\tnumber of homozygous\t"
                line3 = ""
                line4 = ""
                line1middle = ""
                line2middle = ""
                genomes = []
                nucleotides = ""
                x = 0
                y = 0

                if "rs" in file:
                    with open(file) as fP:
                        for line in fP:
                                lineList = line.strip().split("\t")
                                if "locus" in lineList:
                                        idxGenomes = lineList.index('alleles')
                                        genomes = lineList[idxGenomes+1:]
                                else:
                                        t = 0
                                        for i in lineList[idxGenomes+1:]:
                                                if "1" and "0" in i:
                                                        line1middle = line1middle + " " + genomes[t]
                                                        x = x + 1
                                                elif "1/1" in i:
                                                        nucleotides = lineList[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ")
                                                        line2middle = line2middle + " " + genomes[t]
                                                        y = y + 1
                                                t = t + 1
                    line1 = line1 + "\t" + str(x) + "\t" + line1middle.strip() + "\n"
                    line2 = line2 + "\t" + str(y) + "\t" + nucleotides + "\t" + line2middle.strip() + "\n"

                    if x != 0:
                        linesHetero.append(line1)
                    if y != 0:
                        linesHomo.append(line2)
                else:
                    #print("FILE NON RS", file)
                    with open(file) as f:

                        genomes = []
                        for line in f:
                            x = 0
                            y = 0
                            line1middle = ""
                            line2middle = ""
                            if "locus" in line:
                                idx = line.strip().split("\t").index("alleles")
                                #print(idx)
                                #print("L in line", line)
                                genomes = line.strip().split("\t")[idx+1:]
                            else:
                                #print("LINE WITH DATA", line)
                                line3 = rsid + " " + line.split("\t")[2] + "\tnumber of heterozygous\t"#+
                                line4 = rsid + " " + line.split("\t")[2] + "\tnumber of homozygous\t" #+ line.split("\t")[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ")
                                #print(line3)
                                #print(line4)
                                arrGenomes = line.strip().split("\t")[2:]
                                #print(arrGenomes)
                                t = 0
                                for i in arrGenomes:
                                    if "0/1" in i or "1/0" in i:
                                        #print("BOTH", i)

                                        x = x + 1
                                        #print(t)
                                        #print(genomes)
                                        #print(genomes[t])
                                        line1middle = line1middle + " " + genomes[t]
                                        #print("LM", line1middle)
                                    elif "1/1" in i:
                                        line2middle = line2middle + " " + genomes[t]
                                        y = y + 1
                                    t = t + 1
                                #print(x)
                                #print(y)
                                #print("LM1", line1middle)
                                line3 = line3 + "\t" + str(x) + "\t" +line.split("\t")[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ").strip() + "\t" + line1middle.strip() + "\n"
                                #print(line3)
                                line4 = line4 + "\t" + str(y) + "\t" +line.split("\t")[1].replace("[", "").replace("]", "").replace('"', "").replace(",", " ").strip() + "\t" + line2middle.strip() + "\n"
                                #print(line4)
                                if x != 0:
                                    linesHetero.append(line3)
                                if y != 0:
                                    linesHomo.append(line4)







for i in sorted(list(set(linesHetero))):
    if len(i.split("\t")[5].split(" ")) < maxGenomes:
        fOpen.write(i)
fOpen.write("\n\n\n\n\n")
for j in sorted(list(set(linesHomo))):
    if len(j.split("\t")[5].split(" ")) < maxGenomes:
        fOpen.write(j)

Here’s the error:

python3 mtx2rep2.py --file_req Search1.tsv --ofile repTest4.tsv --r_folder REPORTS_TEST8/ --MTX_folder ../MTX2 --mtx_template mtx_from_chr --no_proc 2 --max_genomes 4
2022-03-23 02:26:33 WARN  Utils:69 - Your hostname, ubuntu resolves to a loopback address: 127.0.1.1; using 172.16.162.133 instead (on interface ens33)
2022-03-23 02:26:33 WARN  Utils:69 - Set SPARK_LOCAL_IP if you need to bind to another address
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.spark.unsafe.Platform (file:/home/ani/.local/lib/python3.9/site-packages/pyspark/jars/spark-unsafe_2.12-3.1.3.jar) to constructor java.nio.DirectByteBuffer(long,int)
WARNING: Please consider reporting this to the maintainers of org.apache.spark.unsafe.Platform
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2022-03-23 02:26:34 WARN  NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-03-23 02:26:34 WARN  Hail:43 - This Hail JAR was compiled for Spark 3.1.2, running with Spark 3.1.3.
  Compatibility is not guaranteed.
Running on Apache Spark version 3.1.3
SparkUI available at http://172.16.162.133:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.91-44b441376f9a
LOGGING: writing to LOGS/hail-2022-03-23_02:26:32.253557.log
LINE IS 1:500000-737000

ELSE
path ../MTX2/mtx_from_chr1.mt
LINE IS rs12028261

LINE IS rs3131984

LINE IS rs4951928
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "/home/ani/Documents/TRUNC/TRUNC/TESTS/mtx2rep2.py", line 64, in funMP
    mtx = hl.read_matrix_table(pathToMTXFolderAndTemplate + str(j) + ".mt")
  File "<decorator-gen-1352>", line 2, in read_matrix_table
  File "/home/ani/.local/lib/python3.9/site-packages/hail/typecheck/check.py", line 577, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/home/ani/.local/lib/python3.9/site-packages/hail/methods/impex.py", line 2126, in read_matrix_table
    for rg_config in Env.backend().load_references_from_dataset(path):
  File "/home/ani/.local/lib/python3.9/site-packages/hail/backend/spark_backend.py", line 328, in load_references_from_dataset
    return json.loads(self.hail_package().variant.ReferenceGenome.fromHailDataset(self.fs._jfs, path))
  File "/home/ani/.local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1644, in __getattr__
    raise Py4JError("{0} does not exist in the JVM".format(new_fqn))
py4j.protocol.Py4JError: is.hail.variant does not exist in the JVM
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/ani/Documents/TRUNC/TRUNC/TESTS/mtx2rep2.py", line 97, in <module>
    p.map(funMP, linesF)#'''
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
py4j.protocol.Py4JError: is.hail.variant does not exist in the JVM

Interestingly, that doesn’t occur on Mac, but occurs on Cent OS and Ubuntu.

I don’t know if hail can be multiprocessed, never tried, I think it’s not really necessary. Hail starts Spark, Spark runs a job in parallel using all machine cores available. So multiprocessing shouldn’t really give you an advantage here anyway I’d think.

Be that as it may, error seems like it might be the same one as this person ran into: python - multi-processing with spark(PySpark) - Stack Overflow