From_pandas triggering python different versions error

Hi hail team,

I’m running code that uses hl.Table_from_pandas and seeing a weird error. I’m running this in a notebook:

joint_pops_ht = hl.Table.from_pandas(joint_pops_pd, key=list(joint_scores_ht.key))

The command appears to be triggering this error:

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-39-2b21a7d41cb1> in <module>
----> 1 joint_pops_ht = hl.Table.from_pandas(joint_pops_pd, key=list(joint_scores_ht.key))
      2 
      3 scores_ht = joint_scores_ht.filter(hl.is_missing(joint_scores_ht.pop_for_rf))
      4 scores_ht = scores_ht.drop('pop_for_rf')
      5 joint_pops_ht = joint_pops_ht.drop('pop_for_rf')

<decorator-gen-1135> in from_pandas(df, key)

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/table.py in from_pandas(df, key)
   3203         :class:`.Table`
   3204         """
-> 3205         return Env.spark_backend('from_pandas').from_pandas(df, key)
   3206 
   3207     @typecheck_method(other=table_type, tolerance=nullable(numeric), absolute=bool)

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/backend.py in from_pandas(self, df, key)
    252 
    253     def from_pandas(self, df, key):
--> 254         return Table.from_spark(Env.spark_session().createDataFrame(df), key)
    255 
    256     def add_reference(self, config):

<decorator-gen-1129> in from_spark(df, key)

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/table.py in from_spark(df, key)
   3140             Table constructed from the Spark SQL DataFrame.
   3141         """
-> 3142         return Env.spark_backend('from_spark').from_spark(df, key)
   3143 
   3144     @typecheck_method(flatten=bool)

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/backend/backend.py in from_spark(self, df, key)
    240 
    241     def from_spark(self, df, key):
--> 242         return Table._from_java(Env.jutils().pyFromDF(df._jdf, key))
    243 
    244     def to_spark(self, t, flatten):

/opt/conda/miniconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/conda/miniconda3/lib/python3.6/site-packages/hail/utils/java.py in deco(*args, **kwargs)
    209             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    210                              'Hail version: %s\n'
--> 211                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    212         except pyspark.sql.utils.CapturedException as e:
    213             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.


Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 14 in stage 37.0 failed 20 times, most recent failure: Lost task 14.19 in stage 37.0 (TID 339543, kc2-w-60.c.maclab-ukbb.internal, executor 2430): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at is.hail.rvd.RVD$$anonfun$28.apply(RVD.scala:1227)
	at is.hail.rvd.RVD$$anonfun$28.apply(RVD.scala:1226)
	at is.hail.sparkextras.ContextRDD$$anonfun$crunJobWithIndex$1.apply(ContextRDD.scala:232)
	at is.hail.sparkextras.ContextRDD$$anonfun$crunJobWithIndex$1.apply(ContextRDD.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1892)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1880)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at is.hail.sparkextras.ContextRDD.crunJobWithIndex(ContextRDD.scala:228)
	at is.hail.rvd.RVD$.getKeyInfo(RVD.scala:1226)
	at is.hail.rvd.RVD$.makeCoercer(RVD.scala:1301)
	at is.hail.rvd.RVD$.coerce(RVD.scala:1256)
	at is.hail.rvd.RVD$.coerce(RVD.scala:1240)
	at is.hail.expr.ir.TableValue$.apply(TableValue.scala:35)
	at is.hail.utils.Py4jUtils$$anonfun$pyFromDF$1.apply(Py4jUtils.scala:166)
	at is.hail.utils.Py4jUtils$$anonfun$pyFromDF$1.apply(Py4jUtils.scala:165)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:15)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:13)
	at is.hail.utils.package$.using(package.scala:604)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:13)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:10)
	at is.hail.utils.Py4jUtils$class.pyFromDF(Py4jUtils.scala:165)
	at is.hail.utils.package$.pyFromDF(package.scala:76)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at is.hail.rvd.RVD$$anonfun$28.apply(RVD.scala:1227)
	at is.hail.rvd.RVD$$anonfun$28.apply(RVD.scala:1226)
	at is.hail.sparkextras.ContextRDD$$anonfun$crunJobWithIndex$1.apply(ContextRDD.scala:232)
	at is.hail.sparkextras.ContextRDD$$anonfun$crunJobWithIndex$1.apply(ContextRDD.scala:230)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)




Hail version: 0.2.38-3474ef537565
Error summary: PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 267, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

I updated gcloud. Is there something wrong with my cluster configuration?

Did you use hailctl dataproc to create this cluster? What command did you run to create the cluseter?

yup!

hailctl dataproc start kc3 --master-machine-type n1-highmem-8 --worker-machine-type n1-highmem-8 --init gs://gnomad-public/tools/inits/master-init.sh --worker-boot-disk-size 100 --project maclab-ukbb --max-idle=500m --properties=spark:spark.executor-memory=35g,spark:spark.speculation=true,spark:spark.speculation.quantile=0.9,spark:spark.speculation.multiplier=3 --num-workers=100

Hmm, this is a new one for me. Some things to check/questions:

As the error message suggests, can you check the values of PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON?

Can you show me the gcloud command that was generated by that hailctl dataproc command? If you don’t have it anymore, you can append --dry-run to the end of the hailctl dataproc command to see what gcloud command it makes without actually running it.

Is that master-init.sh initialization action new, or something you’ve used before?

gcloud beta dataproc clusters create \
    kc3 \
    --image-version=1.4-debian9 \
    --properties=^|||^spark:spark.task.maxFailures=20|||spark:spark.driver.extraJavaOptions=-Xss4M|||spark:spark.executor.extraJavaOptions=-Xss4M|||spark:spark.speculation=true|||hdfs:dfs.replication=1|||dataproc:dataproc.logging.stackdriver.enable=false|||dataproc:dataproc.monitoring.stackdriver.enable=false|||spark:spark.executor-memory=35g|||spark:spark.speculation.quantile=0.9|||spark:spark.speculation.multiplier=3|||spark:spark.driver.memory=41g \
    --initialization-actions=gs://hail-common/hailctl/dataproc/0.2.38/init_notebook.py,gs://gnomad-public/tools/inits/master-init.sh \
    --metadata=^|||^WHEEL=gs://hail-common/hailctl/dataproc/0.2.38/hail-0.2.38-py3-none-any.whl|||PKGS=aiohttp>=3.6,<3.7|aiohttp_session>=2.7,<2.8|asyncinit>=0.2.4,<0.3|bokeh>1.1,<1.3|decorator<5|gcsfs==0.2.1|humanize==1.0.0|hurry.filesize==0.9|nest_asyncio|numpy<2|pandas>0.24,<0.26|parsimonious<0.9|PyJWT|python-json-logger==0.1.11|requests>=2.21.0,<2.21.1|scipy>1.2,<1.4|tabulate==0.8.3|tqdm==4.42.1 \
    --master-machine-type=n1-highmem-8 \
    --master-boot-disk-size=100GB \
    --num-master-local-ssds=0 \
    --num-preemptible-workers=0 \
    --num-worker-local-ssds=0 \
    --num-workers=100 \
    --preemptible-worker-boot-disk-size=40GB \
    --worker-boot-disk-size=100GB \
    --worker-machine-type=n1-highmem-8 \
    --initialization-action-timeout=20m \
    --project=maclab-ukbb \
    --labels=creator=kchao_broadinstitute_org \
    --max-idle=500m

I’ve used the master-init.sh for a while. Here is what it contains:

#!/bin/bash

set -x


# Installing numpy on workers for pyspark purposes
pip install numpy

ROLE=$(/usr/share/google/get_metadata_value attributes/dataproc-role)
if [[ "${ROLE}" == 'Master' ]]; then

    git clone https://github.com/broadinstitute/gnomad_methods.git /home/gnomad_methods

    mkdir -p /home/hail/
    ln -s /home/gnomad_methods/gnomad /home/hail

    cd /home/gnomad_methods/init_scripts
    chmod +x gnomad-init.sh sparklyr-init.sh

    # This is here so as not have 2 apt-get processes fighting for a lock
    apt-get install -y tmux liblz4-dev

    ./gnomad-init.sh > gnomad_startup.log 2>&1 &
    ./sparklyr-init.sh > sparklyr_startup.log 2>&1 &

fi

I’ll check the PYSPARK values when I have a cluster running

I checked the values in $SPARK_HOME/conf/spark-env.sh on kc3-m, which showed


# User-supplied properties.
export SPARK_DAEMON_MEMORY="4000m"
export PYTHONHASHSEED=0
export PYTHONPATH=/usr/lib/spark/python/lib/pyspark.zip:/usr/lib/spark/python/lib/py4j-0.10.7-src.zip
export SPARK_HOME=/usr/lib/spark/
export PYSPARK_PYTHON=/opt/conda/default/bin/python
export PYSPARK_DRIVER_PYTHON=/opt/conda/default/bin/python

is there somewhere else I should check?

I’m going to try creating a cluster with same properties as yours and see if I can replicate.

Maybe ssh into your workers and check their conda version? If they have different conda versions the default python could be different (I’m still not sure how this situation would arise though).

I ran conda info on the master and a worker node and got conda version : 4.5.4 for both

Does python --version on workers come back as 2.7?

nope: Python 3.6.10 :: Anaconda, Inc.

Is hail generally working, and it’s just from_pandas that’s causing a problem? Like can you run something silly like

hl.utils.range_table(100).count()

?

image

I ran a bunch of other code yesterday that seemed to work fine, but somehow from_pandas failed every time

That’s pretty weird. I would think that Hail shouldn’t work at all if your workers are running the wrong python version. Let me try running from_pandas on my cluster. As a workaround, you could always write the pandas table to a tsv and import it.

thanks! yup, I tried that. I also didn’t run into this issue when I ran from_pandas in submit scripts on Monday/last week; not sure if that’s helpful?

I was wrong, most Hail code doesn’t care about the Python version on the workers, so it’s not terribly surprising this only happens with from_pandas.

Was this failure in a notebook? If this happens in notebooks but not submit scripts that would be useful info.

But if this is just something that worked last week and suddenly stopped working under the same conditions, it’s possible that Dataproc changed in some way.

yup, I ran into this in a notebook. I haven’t tried this again in a submit script but can.

I was on Dataproc version 283 until yesterday (I hadn’t updated in a while, and I updated just in case it might fix the error).

I was able to do fromPandas just fine on 0.2.38, using same init scripts as you. Only thing different was my project wasn’t maclab and I used only 2 workers.

It probably won’t help anything, but our release process for 0.2.38 got a little messed up and I had to patch it manually. 0.2.39 was just released. Maybe try updating and see if it works on the new version? If not we’ll have to dig into it more.

Also interested to know if this happens to anyone else starting clusters in this project

thanks, I’ll try updating to 0.2.39! I’ll also ask someone else if they can try this as well