Py4JError: An error occurred while calling o1.pyPersistTable when running hl.hwe_normalized_pca()

We are running hl.hwe_normalized_pca() using both default parameters and with a k of 5

i.e…, eigenvalues, scores, loadings = hl.hwe_normalized_pca(mt_gt.GT) or eigenvalues, scores, loadings = hl.hwe_normalized_pca(mt_gt.GT, k=5), respectively. We are planning on using the PCA output for GWAS analysis.

We are running this on a gcloud cluster with type=n1-highmem-8 machines autoscaled to max at 1,000 secondary workers. When running, the process goes through 2 collect phases, several treeAggregate phases, and another collect phase. In the last phase we get a Py4JError error : Py4JError: An error occurred while calling o1.pyPersistTable (full output error below). We are currently running this in a small test set in a jupyter notebook running on dataproc. As we scale up, we plan on running this in a python script on dataproc. Any help on this would be great.

2023-06-01 18:15:16 Hail: INFO: hwe_normalize: found 38608084 variants after filtering out monomorphic sites.
2023-06-01 19:04:51 Hail: INFO: pca: running PCA with 5 components...
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1207, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1033, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1211, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:37385)
Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3398, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-8-cbee9812c5f6>", line 1, in <cell line: 1>
    eigenvalues, scores, loadings = hl.hwe_normalized_pca(mt_gt.GT, k=5)
  File "<decorator-gen-1647>", line 2, in hwe_normalized_pca
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/typecheck/check.py", line 577, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/methods/pca.py", line 96, in hwe_normalized_pca
    return pca(hwe_normalize(call_expr),
  File "<decorator-gen-1649>", line 2, in pca
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/typecheck/check.py", line 577, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/methods/pca.py", line 196, in pca
    t = (Table(ir.MatrixToTableApply(mt._mir, {
  File "<decorator-gen-1077>", line 2, in persist
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/typecheck/check.py", line 577, in wrapper
    return __original_func(*args_, **kwargs_)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/table.py", line 1937, in persist
    return Env.backend().persist_table(self, storage_level)
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/backend/spark_backend.py", line 296, in persist_table
    return Table._from_java(self._jbackend.pyPersistTable(storage_level, self._to_java_table_ir(t._tir)))
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/hail/backend/py4j_backend.py", line 21, in deco
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 334, in get_return_value
    raise Py4JError(
py4j.protocol.Py4JError: An error occurred while calling o1.pyPersistTable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/conda/miniconda3/lib/python3.8/site-packages/executing/executing.py", line 317, in executing
    args = executing_cache[key]
KeyError: (<code object run_code at 0x7f82284ba660, file "/opt/conda/miniconda3/lib/python3.8/site-packages/IPython/core/interactiveshell.py", line 3362>, 140196998522464, 74)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 977, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1115, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
Input In [8], in <cell line: 1>()
----> 1 eigenvalues, scores, loadings = hl.hwe_normalized_pca(mt_gt.GT, k=5)

File <decorator-gen-1647>:2, in hwe_normalized_pca(call_expr, k, compute_loadings)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/typecheck/check.py:577, in _make_dec.<locals>.wrapper(__original_func, *args, **kwargs)
    574 @decorator
    575 def wrapper(__original_func, *args, **kwargs):
    576     args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577     return __original_func(*args_, **kwargs_)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/methods/pca.py:96, in hwe_normalized_pca(call_expr, k, compute_loadings)
     36 @typecheck(call_expr=expr_call,
     37            k=int,
     38            compute_loadings=bool)
     39 def hwe_normalized_pca(call_expr, k=10, compute_loadings=False) -> Tuple[List[float], Table, Table]:
     40     r"""Run principal component analysis (PCA) on the Hardy-Weinberg-normalized
     41     genotype call matrix.
     42 
   (...)
     93         List of eigenvalues, table with column scores, table with row loadings.
     94     """
---> 96     return pca(hwe_normalize(call_expr),
     97                k,
     98                compute_loadings)

File <decorator-gen-1649>:2, in pca(entry_expr, k, compute_loadings)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/typecheck/check.py:577, in _make_dec.<locals>.wrapper(__original_func, *args, **kwargs)
    574 @decorator
    575 def wrapper(__original_func, *args, **kwargs):
    576     args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577     return __original_func(*args_, **kwargs_)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/methods/pca.py:196, in pca(entry_expr, k, compute_loadings)
    193     mt = mt.select_entries(**{field: entry_expr})
    194 mt = mt.select_cols().select_rows().select_globals()
--> 196 t = (Table(ir.MatrixToTableApply(mt._mir, {
    197     'name': 'PCA',
    198     'entryField': field,
    199     'k': k,
    200     'computeLoadings': compute_loadings
    201 })).persist())
    203 g = t.index_globals()
    204 scores = hl.Table.parallelize(g.scores, key=list(mt.col_key))

File <decorator-gen-1077>:2, in persist(self, storage_level)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/typecheck/check.py:577, in _make_dec.<locals>.wrapper(__original_func, *args, **kwargs)
    574 @decorator
    575 def wrapper(__original_func, *args, **kwargs):
    576     args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 577     return __original_func(*args_, **kwargs_)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/table.py:1937, in Table.persist(self, storage_level)
   1901 @typecheck_method(storage_level=storage_level)
   1902 def persist(self, storage_level='MEMORY_AND_DISK') -> 'Table':
   1903     """Persist this table in memory or on disk.
   1904 
   1905     Examples
   (...)
   1935         Persisted table.
   1936     """
-> 1937     return Env.backend().persist_table(self, storage_level)

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/backend/spark_backend.py:296, in SparkBackend.persist_table(self, t, storage_level)
    295 def persist_table(self, t, storage_level):
--> 296     return Table._from_java(self._jbackend.pyPersistTable(storage_level, self._to_java_table_ir(t._tir)))

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /opt/conda/miniconda3/lib/python3.8/site-packages/hail/backend/py4j_backend.py:21, in handle_java_exception.<locals>.deco(*args, **kwargs)
     19 import pyspark
     20 try:
---> 21     return f(*args, **kwargs)
     22 except py4j.protocol.Py4JJavaError as e:
     23     s = e.java_exception.toString()

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py:334, in get_return_value(answer, gateway_client, target_id, name)
    330             raise Py4JError(
    331                 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332                 format(target_id, ".", name, value))
    333     else:
--> 334         raise Py4JError(
    335             "An error occurred while calling {0}{1}{2}".
    336             format(target_id, ".", name))
    337 else:
    338     type = answer[1]

Py4JError: An error occurred while calling o1.pyPersistTable
1 Like

There should be details in the worker & driver logs about what exactly went wrong. The JVM has apparently crashed. This can happen when the native code is not compiled for the correct platform, when the cluster has not been initialized properly, or when there is insufficient memory on the cluster.

It’s hard to assess the memory needs without seeing the full script that leading up to the PCA calculation. Can you share the full script?

You mention a gcloud cluster. Did you start it with hailctl dataproc start? Can you share the full invocation?

What happens after the PCA? Do you immediately write the scores to a file or do you use them in some other calculation? I recommend saving them to a file with write, then reading that file and using it in downstream computations.

Hi.

We are creating our cluster with the following command

hailctl dataproc \                              
    start cluster \
    --vep GRCh38 \
    --autoscaling-policy=autoscaling_policy \
    --requester-pays-allow-annotation-db \
    --packages gnomad \
    --requester-pays-allow-buckets gnomad-public-requester-pays \
    --secondary-worker-type=non-preemptible \
    --master-machine-type=n1-highmem-8 \
    --worker-machine-type=n1-highmem-8 \
    --worker-boot-disk-size=1000 \
    --preemptible-worker-boot-disk-size=1000 \
    --properties=dataproc:dataproc.logging.stackdriver.enable=true,dataproc:dataproc.monitoring.stackdriver.enable=true

We are connecting to the notebook via

 hailctl dataproc connect cluster notebook

We have also tried running it as a python script, since posting this issue, instead of a notebook using

hailctl dataproc submit cluster script.py

We are not getting an error message when running it this way. Instead, the last line of output before failing is

[Stage 188:===================================================(2586 + 1) / 2586]

looking in the hail log, the last line written is

2023-06-03 17:09:44 MemoryStore: INFO: Block broadcast_300 stored as values in memory (estimated size 13.2 GiB, free 8.5 GiB)

We are currently not doing anything with the output. Our plan is to store it on cloud to read into future scripts, but right now we are just testing the PCA function on a small test to test how it runs and get an idea of what the output is and how to store it.

our current script we are running looks like this

#!/usr/bin/env python
# coding: utf-8
# #### PCA methods test 
# In[2]:
## Hail Initialization
import hail as hl
from hail.plot import show
from pprint import pprint
hl.stop()
hl.init(spark_conf={"spark.executor.extraClassPath": "/foo/bar/baz.so"},log='/home/hail/hail.log')
#hl.stop()
#hl.init()
# In[3]:
#| code-fold: true
#| output: false
from io import StringIO
import time
import pandas as pd
#import pandas_gbq
# Ignore warning when renaming copied columns
pd.options.mode.chained_assignment = None
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
# Use for numerical instead of exponential log tick notation
from matplotlib.ticker import ScalarFormatter
import plotly.express as px
import plotly.io as pio
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from google.cloud import storage
import yaml
from google.cloud import bigquery as bq
from collections import Counter
# In[4]:
mt_gt = hl.read_matrix_table('gs://PATH_TO_MT')
# In[5]:
mt_gt.describe()
# In[ ]:
#filter MT based on population
mt_gt= mt_gt.filter_cols((mt_gt.super_pop == 'AMR'))
# In[13]:
print(f"Count of rows and columns: {mt_gt.count()}")
#663351127, 678
# In[21]:
mt_gt=mt_gt.select_cols()
mt_gt=mt_gt.select_rows()
mt_gt=mt_gt.select_entries(mt_gt.GT)
# In[23]:
eigenvalues, scores, loadings = hl.hwe_normalized_pca(mt_gt.GT, k=5)
print("PCA Done")
# In[ ]:
scores.describe()
# In[ ]:
eigenvalues.describe()
# In[ ]:
loadings.describe()

You won’t be able to successfully run PCA on this many variants. Folks usually use 5,000-10,000 common variants. If, after filtering to common variants, you still have too many variants you can use sample_rows to further downsample. You’ll want to write out these loci so you know which ones you used for PCA.

2 Likes

Thank you for the reply. We filtered for common variants with a 5% cutoff, pruned using ld_prune, used a subset of variants that overlapped with published data from the 1000 genomes database, and had success with our test run.

2 Likes