Using hadoop and spark to use with hail 0.2.83

Hi
I’m running hail 0.2.83 on a cluster with python3.8, hadoop 3.2.1 and spark 3.1.2 and I’m experiencing problems.
I am trying to read in data from VCFs to a matrix table, save the matrixtable, then load it and subset to just one chromosome.

It appears to read the VCFs just fine, save the matrixtable, load matrixtable from file and when I filter by interval it doesn’t report an error:

mt1 = hl.filter_intervals(mt, [hl.parse_locus_interval('chrX')])

However, if I try to do anything with the output matrix, mt1 (for example show, count or write to file) I get an error. I think this error may be something to do with hadoop/spark. The error claims that a file which both exists and is readable from the command line is not found.

Can anyone offer any advice?

I have initialised hail as follows:

import hail as hl
import pyspark
sc = pyspark.SparkContext()
hadoop_config = sc._jsc.hadoopConfiguration()
hadoop_config.set("fs.s3a.access.key", "")
hadoop_config.set("fs.s3a.secret.key", "")
hadoop_config.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_config.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

tmp_dir = "hdfs://spark-master:9820/"
temp_dir = "file:///home/ubuntu/data/tmp"
lustre_dir = "file:///lustre/scratch123/qc"
hl.init(sc=sc, tmp_dir=lustre_dir, local_tmpdir=lustre_dir, default_reference="GRCh38")

The error I get with mt1.count() is too long to paste in it’s entirety so this is the start and end, errors with mt1.show() and mt1.write() are similar

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
Input In [9], in <module>
----> 1 mt.count()

File ~/venv/lib/python3.8/site-packages/hail/matrixtable.py:2441, in MatrixTable.count(self)
   2428 """Count the number of rows and columns in the matrix.
   2429 
   2430 Examples
   (...)
   2438     Number of rows, number of cols.
   2439 """
   2440 count_ir = ir.MatrixCount(self._mir)
-> 2441 return Env.backend().execute(count_ir)

File ~/venv/lib/python3.8/site-packages/hail/backend/py4j_backend.py:110, in Py4JBackend.execute(self, ir, timed)
    104     message_and_trace = (f'{error_message}\n'
    105                          '------------\n'
    106                          'Hail stack trace:\n'
    107                          f'{better_stack_trace}')
    108     raise HailUserError(message_and_trace) from None
--> 110 raise e

File ~/venv/lib/python3.8/site-packages/hail/backend/py4j_backend.py:86, in Py4JBackend.execute(self, ir, timed)
     84 # print(self._hail_package.expr.ir.Pretty.apply(jir, True, -1))
     85 try:
---> 86     result_tuple = self._jhc.backend().executeEncode(jir, stream_codec)
     87     (result, timings) = (result_tuple._1(), result_tuple._2())
     88     value = ir.typ._from_encoding(result)

File ~/venv/lib/python3.8/site-packages/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File ~/venv/lib/python3.8/site-packages/hail/backend/py4j_backend.py:29, in handle_java_exception.<locals>.deco(*args, **kwargs)
     27         raise FatalError('Error summary: %s' % (deepest,), error_id) from None
     28     else:
---> 29         raise FatalError('%s\n\nJava stack trace:\n%s\n'
     30                          'Hail version: %s\n'
     31                          'Error summary: %s' % (deepest, full, hail.__version__, deepest), error_id) from None
     32 except pyspark.sql.utils.CapturedException as e:
     33     raise FatalError('%s\n\nJava stack trace:\n%s\n'
     34                      'Hail version: %s\n'
     35                      'Error summary: %s' % (e.desc, e.stackTrace, hail.__version__, e.desc)) from None

FatalError: RemoteException: File does not exist: /lustre/scratch123/qc/matrixtables/gatk_unprocessed.mt/rows/rows/parts/part-5562-f7c670ae-6f02-458b-9bf9-485c33dc4fed
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1954)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)


Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 5.0 failed 4 times, most recent failure: Lost task 44.3 in stage 5.0 (TID 234) (192.168.252.100 executor 2): java.io.FileNotFoundException: File does not exist: /lustre/scratch123/qc/matrixtables/gatk_unprocessed.mt/rows/rows/parts/part-5562-f7c670ae-6f02-458b-9bf9-485c33dc4fed
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1954)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)


.
.
.
.
.
.

	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)





Hail version: 0.2.83-b3151b4c4271
Error summary: RemoteException: File does not exist: /lustre/scratch123/qc/matrixtables/gatk_unprocessed.mt/rows/rows/parts/part-5562-f7c670ae-6f02-458b-9bf9-485c33dc4fed
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:86)
	at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:76)
	at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getBlockLocations(FSDirStatAndListingOp.java:158)
	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1954)
	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:755)
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:439)
	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:528)
	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1070)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:999)
	at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:927)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2915)
[Stage 5:>

Hi Ruth,

Sorry for delayed reply, this post got autoflagged as spam for some reason (definitely a mistake).

I’m guessing your issue here is your tmp_dir=lustre_dir line. tmp_dir has to be something available to all computers in cluster, like HDFS or S3. It can’t be a local file path.

Thank you for your help. Removing the “file://” from the beginning of lustre_dir when saving and reading matrixtables did the trick here. I’m very much a novice at this and not sure when “file:/path/to/dir”, when"file:///path/to/dir" and when ‘/path/to/dir’ are used so learning by trial and error at present!

1 Like

file means local file system, so that’s going to refer to a file path on a particular computer. hdfs means Hadoop distributed file system, which is a sort of shared file system on the network where the file still gets written to a particular computer, but it’s advertised in a way where other computers can see it. s3 is also visible to the whole network, but it persists after the cluster is deleted since it gets written to an s3 bucket.