Unread block data error (Spark master - slave issue)

I am trying to run a basic script on spark cluster that takes in a file, converts it and outputs in different format. The spark cluster at the moment consists of 1 master and 1 slave both running on the same node. The full command is:

nohup spark-submit --master spark://tr-nodedev1:7077 --verbose --conf spark.driver.port=40065 --driver-memory 4g --conf spark.driver.extraClassPath=/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/hail-all-spark.jar --conf spark.executor.extraClassPath=./hail-all-spark.jar ./hail_scripts/v02/convert_vcf_to_hail.py /clinvar_37.vcf -ht --genome-version 37 --output /seqr-reference-hail2/clinvar_37.ht &

And it fails with the following error:

Traceback (most recent call last):
File “/opt/seqr/hail-elasticsearch-pipelines/./hail_scripts/v02/convert_vcf_to_hail.py”, line 38, in
mt = import_vcf(vcf_path, args.genome_version, force_bgz=True, min_partitions=10000, drop_samples=True)
File “/opt/seqr/hail-elasticsearch-pipelines/hail_scripts/v02/utils/hail_utils.py”, line 71, in import_vcf
skip_invalid_loci=skip_invalid_loci)
File “</opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/decorator.py:decorator-gen-1246>”, line 2, in import_vcf
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/typecheck/check.py”, line 585, in wrapper
return original_func(*args, **kwargs)
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/methods/impex.py”, line 2106, in import_vcf
return MatrixTable(MatrixRead(reader, drop_cols=drop_samples))
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/matrixtable.py”, line 557, in init
self._type = self._mir.typ
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/ir/base_ir.py”, line 328, in typ
self._compute_type()
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/ir/matrix_ir.py”, line 60, in _compute_type
self._type = Env.backend().matrix_type(self)
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/backend/backend.py”, line 124, in matrix_type
jir = self._to_java_ir(mir)
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/backend/backend.py”, line 105, in _to_java_ir
ir._jir = ir.parse(r(ir), ir_map=r.jirs)
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/ir/base_ir.py”, line 336, in parse
return Env.hail().expr.ir.IRParser.parse_matrix_ir(code, ref_map, ir_map)
File “/opt/seqr/spark/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py”, line 1257, in call
File “/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/utils/java.py”, line 225, in deco
‘Error summary: %s’ % (deepest, full, hail.version, deepest)) from None
hail.utils.java.FatalError: IllegalStateException: unread block data

I looked online and found the thread:

Using the suggestion I exported JAVA_HOME environment variable in ‘spark-env.sh’ and restarted the cluster but it did not help.

Such command works fine:

spark-submit --conf spark.driver.extraClassPath=/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/hail-all-spark.jar --conf spark.executor.extraClassPath=./hail-all-spark.jar ./hail_scripts/v02/convert_vcf_to_hail.py /hgmd_pro_2019.3_hg19_noDB.vcf -ht --genome-version 37 --output /seqr-reference-hail2/hgmd_2019.3_hg19_noDB.ht

When it just runs in the terminal (local mode, I guess). So, I suspect that the main issue here is that driver and slave differ in Java somehow. How could it be fixed since just setting JAVA_HOME does not work?

Can you post the full error message and stack trace? We need that to fully understand the issue.

It indeed sounds like you have different java versions installed on the leader node and the follower. What happens if you run java -version on the leader and on the follower? Are they the same?

I do not know how to print on leader or follower since the node is one and the same. That is why I am also confused since I would expect both driver and slave to run on one node with identical environment. On one node I started master and on the very same node also slave. Further stack trace looks like that:

Java stack trace:
org.json4s.package$MappingException: unknown error
at org.json4s.Extraction$.extract(Extraction.scala:43)
at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
at org.json4s.jackson.Serialization$.read(Serialization.scala:50)
at org.json4s.Serialization$class.read(Serialization.scala:30)
at org.json4s.jackson.Serialization$.read(Serialization.scala:17)
at is.hail.expr.ir.IRParser$.deserialize(Parser.scala:160)
at is.hail.expr.ir.IRParser$.matrix_ir_1(Parser.scala:1277)
at is.hail.expr.ir.IRParser$.matrix_ir(Parser.scala:1213)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1462)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1462)
at is.hail.expr.ir.IRParser$.parse(Parser.scala:1446)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1462)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1461)
at is.hail.expr.ir.IRParser.parse_matrix_ir(Parser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.json4s.reflect.Executable.invoke(Executable.scala:52)
at org.json4s.Extraction$ClassInstanceBuilder.instantiate(Extraction.scala:554)
at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:597)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
at org.json4s.Extraction$.extract(Extraction.scala:392)
at org.json4s.Extraction$ClassInstanceBuilder.mkWithTypeHint(Extraction.scala:587)
at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:596)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
at org.json4s.Extraction$.extract(Extraction.scala:392)
at org.json4s.Extraction$.extract(Extraction.scala:39)
at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
at org.json4s.jackson.Serialization$.read(Serialization.scala:50)
at org.json4s.Serialization$class.read(Serialization.scala:30)
at org.json4s.jackson.Serialization$.read(Serialization.scala:17)
at is.hail.expr.ir.IRParser$.deserialize(Parser.scala:160)
at is.hail.expr.ir.IRParser$.matrix_ir_1(Parser.scala:1277)
at is.hail.expr.ir.IRParser$.matrix_ir(Parser.scala:1213)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1462)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1462)
at is.hail.expr.ir.IRParser$.parse(Parser.scala:1446)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1462)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1461)
at is.hail.expr.ir.IRParser.parse_matrix_ir(Parser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 137.187.60.61, executor 0): java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2782)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1604)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:376)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:927)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
at is.hail.io.vcf.MatrixVCFReader.(LoadVCF.scala:1491)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.json4s.reflect.Executable.invoke(Executable.scala:52)
at org.json4s.Extraction$ClassInstanceBuilder.instantiate(Extraction.scala:554)
at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:597)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
at org.json4s.Extraction$.extract(Extraction.scala:392)
at org.json4s.Extraction$ClassInstanceBuilder.mkWithTypeHint(Extraction.scala:587)
at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:596)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
at org.json4s.Extraction$.extract(Extraction.scala:392)
at org.json4s.Extraction$.extract(Extraction.scala:39)
at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
at org.json4s.jackson.Serialization$.read(Serialization.scala:50)
at org.json4s.Serialization$class.read(Serialization.scala:30)
at org.json4s.jackson.Serialization$.read(Serialization.scala:17)
at is.hail.expr.ir.IRParser$.deserialize(Parser.scala:160)
at is.hail.expr.ir.IRParser$.matrix_ir_1(Parser.scala:1277)
at is.hail.expr.ir.IRParser$.matrix_ir(Parser.scala:1213)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1462)
at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1462)
at is.hail.expr.ir.IRParser$.parse(Parser.scala:1446)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1462)
at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1461)
at is.hail.expr.ir.IRParser.parse_matrix_ir(Parser.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2782)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1604)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2286)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2210)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2068)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1572)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:376)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Here is how I am trying to set JAVA in the ‘spark-env.sh’:

export JAVA_HOME=/usr/lib/jvm/java
export SPARK_JAVA_OPTS+=" -Djava.library.path= $SPARK_LIBRARY_PATH : $JAVA_HOME "

Do you know how could I check master and slave environment variables while they are running on one node? I just have no idea…

Are you able to execute non-Hail Spark jobs? You should first make sure Spark works fine without Hail before trying to get Hail working on Spark.

It seems to me master-slave issue, but I have not yet tried to execute just spark process. I assume it will fail, but going to try.

Ok, I just put these 3 lines from Hail 0.2 installation guide into ‘test.py’:

import hail as hl
mt = hl.balding_nichols_model(n_populations=3, n_samples=50, n_variants=100)
mt.count()
print(‘Success!’)

And it runs successfully with the command:

nohup spark-submit --master spark://ai-grisnodedev1:7077 --verbose --conf spark.driver.port=40065 --driver-memory 4g --conf spark.driver.extraClassPath=/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/hail-all-spark.jar --conf spark.executor.extraClassPath=./hail-all-spark.jar test_hail.py &

The script

./hail_scripts/v02/convert_vcf_to_hail.py

of which I showed the stack trace above can be found here:

https://github.com/macarthur-lab/hail-elasticsearch-pipelines/blob/master/hail_scripts/v02/convert_vcf_to_hail.py

And so

import_vcf()

Function somehow failed with the error. So I reasoned to add two more lines to the test.py to check whether simple ‘import_vcf’ fails:

clinvar_37 = hl.import_vcf(‘/clinvar_37.vcf’)
clinvar_37.describe()

And it failed with precisely the same error on ‘import_vcf()’ line.

I checked with ‘utils.hadoop_exists’ method and the file does exist. I think it is master-slave issue and suspect it could be somehow related to the fact that Hail 0.2 should be run in Conda virtual environment, so if, say, the worker escapes somehow the environment, then obviously there would be an issue.

Submitting spark job works fine. I submitted the following test script:

from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster(“local”).setAppName(“errors”)
sc = SparkContext(conf = conf)
header_file = sc.textFile(‘hdfs://137.187.60.61:9000/BATCH9_header.txt’)
num_lines = header_file.count()
print( ‘num_lines in header: %s’ % num_lines)

with the command:

spark-submit --master spark://ai-grisnodedev1:7077 --verbose --conf spark.driver.port=40065 --driver-memory 4g test_spark.py

So, spark jobs run fine. Then I tried to modify the path to Hadoop folder in ‘test_hail.py’ basic script but it gave the same error of ‘unread block data’. Seems like it does not really matter how we specify the path for Hadoop in hail’s ‘import_vcf’: it can be just a path, or full with the ip:

import hail as hl
file_exists = hl.utils.hadoop_exists(‘hdfs://137.187.60.61:9000/clinvar_37.vcf’)
print(‘Exists? : %s’ % file_exists)
clinvar_37 = hl.import_vcf(‘hdfs://137.187.60.61:9000/clinvar_37.vcf’, reference_genome=‘GRCh37’)
clinvar_37.describe()

Ok, we fixed it and the solution was to add the following setting to our script:

–jars /opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/hail-all-spark.jar

So, the working command is the following:

spark-submit --master spark://ai-grisnodedev1:7077 --verbose --conf spark.driver.port=40065 --driver-memory 4g --conf spark.driver.extraClassPath=/opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/hail-all-spark.jar --conf spark.executor.extraClassPath=./hail-all-spark.jar --jars /opt/seqr/.conda/envs/py37/lib/python3.7/site-packages/hail/hail-all-spark.jar test_hail.py

Great!