HAIL 0.1: export vcf hadoop error

Hail version:

0.1-74bf1eb

What you did:

Export vcf to local file:// path

What went wrong (all error messages here, including the full java stack trace):

When exporting vcf to path that begins with ‘file://’, I get the error: ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found. I am using Spark 2.2.1 (prebuilt with hadoop2.7) with AWS-Hadoop 2.7.4. I have the following settings in spark config and am using a custom directParquetOutputCommitter. Standard writes to ‘file://’ of Spark dataframes work without issue.

Thanks for any help!

spark.sql.parquet.output.committer.class org.apache.spark.sql.parquet.DirectParquetOutputCommitter
spark.hadoop.mapred.output.committer.class org.apache.hadoop.mapred.DirectFileOutputCommitter
spark.hadoop.mapreduce.use.directfileoutputcommitter true
spark.hadoop.spark.sql.parquet.output.committer.class org.apache.spark.sql.parquet.DirectParquetOutputCommitter

Code and stack trace:

================================================================================================== FAILURES ===================================================================================================
__________________________________________________________________________________________ TestHAIL.test_export_vcf ___________________________________________________________________________________________

self = <test_hail.TestHAIL testMethod=test_export_vcf>

    def test_export_vcf(self):
        # define files
        bgen_file = os.path.join(self.testdir, 'example.10bits.bgen')
        sample_file = os.path.join(self.testdir, 'example.sample')
        # make index
        self.hc.index_bgen(bgen_file)
        # load to vds
        bgen_vds = self.hc.import_bgen(bgen_file, sample_file=sample_file)
        # export vcf
        out_path = 'file://' + os.path.join(self.tmpdir, 'test_vcf_export.vcf.bgz')
>       bgen_vds.export_vcf(out_path, export_pp=False, parallel=False)

tests/hail/test_hail.py:55:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
<decorator-gen-398>:2: in export_vcf
    ???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

func = <function export_vcf at 0x7fa13c4d9938>, args = (<hail.dataset.VariantDataset object at 0x7fa13c3c9390>, 'file:///scratch/test_vcf_export.vcf.bgz', None, False, False), kwargs = {}
e = Py4JJavaError(u'An error occurred while calling o160.exportVCF.\n', JavaObject id=o162), tpl = JavaObject id=o210
deepest = 'ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found'
full = 'java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.map...mmand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:748)


'

    @decorator
    def handle_py4j(func, *args, **kwargs):
        try:
            r = func(*args, **kwargs)
        except py4j.protocol.Py4JJavaError as e:
            tpl = Env.jutils().handleForPython(e.java_exception)
            deepest, full = tpl._1(), tpl._2()
            raise FatalError('%s\n\nJava stack trace:\n%s\n'
                             'Hail version: %s\n'
>                            'Error summary: %s' % (deepest, full, Env.hc().version, deepest))
E           FatalError: ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found
E
E           Java stack trace:
E           java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found
E               at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2227)
E               at org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:726)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1051)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply$mcV$sp(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1015)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:973)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply$mcV$sp(RDD.scala:1507)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1495)
E               at is.hail.utils.richUtils.RichRDD$.writeTable$extension1(RichRDD.scala:77)
E               at is.hail.utils.richUtils.RichRDD$.writeTable$extension0(RichRDD.scala:38)
E               at is.hail.io.vcf.ExportVCF$.apply(ExportVCF.scala:453)
E               at is.hail.variant.VariantDatasetFunctions$.exportVCF$extension(VariantDataset.scala:425)
E               at is.hail.variant.VariantDatasetFunctions.exportVCF(VariantDataset.scala:425)
E               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E               at java.lang.reflect.Method.invoke(Method.java:498)
E               at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E               at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E               at py4j.Gateway.invoke(Gateway.java:280)
E               at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E               at py4j.commands.CallCommand.execute(CallCommand.java:79)
E               at py4j.GatewayConnection.run(GatewayConnection.java:214)
E               at java.lang.Thread.run(Thread.java:748)java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found
E               at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2195)
E               at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
E               at org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:726)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1051)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply$mcV$sp(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1015)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:973)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply$mcV$sp(RDD.scala:1507)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1495)
E               at is.hail.utils.richUtils.RichRDD$.writeTable$extension1(RichRDD.scala:77)
E               at is.hail.utils.richUtils.RichRDD$.writeTable$extension0(RichRDD.scala:38)
E               at is.hail.io.vcf.ExportVCF$.apply(ExportVCF.scala:453)
E               at is.hail.variant.VariantDatasetFunctions$.exportVCF$extension(VariantDataset.scala:425)
E               at is.hail.variant.VariantDatasetFunctions.exportVCF(VariantDataset.scala:425)
E               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E               at java.lang.reflect.Method.invoke(Method.java:498)
E               at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E               at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E               at py4j.Gateway.invoke(Gateway.java:280)
E               at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E               at py4j.commands.CallCommand.execute(CallCommand.java:79)
E               at py4j.GatewayConnection.run(GatewayConnection.java:214)
E               at java.lang.Thread.run(Thread.java:748)java.lang.ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found
E               at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2101)
E               at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2193)
E               at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2219)
E               at org.apache.hadoop.mapred.JobConf.getOutputCommitter(JobConf.java:726)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1051)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1035)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply$mcV$sp(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:1016)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1015)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:973)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:971)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply$mcV$sp(RDD.scala:1507)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
E               at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1495)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
E               at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
E               at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
E               at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1495)
E               at is.hail.utils.richUtils.RichRDD$.writeTable$extension1(RichRDD.scala:77)
E               at is.hail.utils.richUtils.RichRDD$.writeTable$extension0(RichRDD.scala:38)
E               at is.hail.io.vcf.ExportVCF$.apply(ExportVCF.scala:453)
E               at is.hail.variant.VariantDatasetFunctions$.exportVCF$extension(VariantDataset.scala:425)
E               at is.hail.variant.VariantDatasetFunctions.exportVCF(VariantDataset.scala:425)
E               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
E               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
E               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
E               at java.lang.reflect.Method.invoke(Method.java:498)
E               at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
E               at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
E               at py4j.Gateway.invoke(Gateway.java:280)
E               at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
E               at py4j.commands.CallCommand.execute(CallCommand.java:79)
E               at py4j.GatewayConnection.run(GatewayConnection.java:214)
E               at java.lang.Thread.run(Thread.java:748)
E
E
E
E           Hail version: 0.1-74bf1eb
E           Error summary: ClassNotFoundException: Class org.apache.hadoop.mapred.DirectFileOutputCommitter not found

(from github issue):
Hail 0.1 isn’t tested against or believed to work with Spark 2.2. Can you update to Hail 0.2 beta (devel)? 0.1 will be fully deprecated when 0.2 is released, and is already in its end of life process.

I’d also recommend that you leave files as BGEN if possible, as it’s an extremely efficient format for encoding huge amounts of imputed genotype data.

Hello and thank you for the prompt response. We have a couple roadblocks to moving to v0.2:

  1. We use Databricks extensively for dev and exploration and HAIL v0.2 is not able to run on Databricks as of yet.
  2. This is ultimately going into a semi-production tool and so we would like to use officially released stable versions as much as possible.
  3. We wish to utilize ADAM within the same Spark environment as HAIL and from what I have found, the HAILKryoSerializer might conflict with the ADAMKryoSerializer?..unless you can specify multiple?

With regards to keeping it in BGEN format, We wish to perform liftover from hg19 to hg38 and left-normalization of the data. Is this possible in BGEN format?

Thanks again for your help!

I’ll address each point -

  1. It’s definitely possible to run Hail 0.2, there’s a description from a Databricks solutions architect here: Hail on Databricks with Spark Cluster. It’s possible to make this even easier, as well.

  2. You are of course welcome to use whichever version you want, but we are going to stop supporting 0.1 (with either bugfixes or forum/chat help) pretty soon. The “stability” of the 0.1 release refers solely to the interface – it is a contract that we won’t push back-incompatible changes. Every commit is tested before it’s deployed, and testing practices are certainly not any worse in the 0.2 beta version than in 0.1. An 0.2 stable release is also coming, and there probably won’t be huge interface changes before that happens.

  3. This isn’t a concern, you can specify multiple Kryo registrators. These classes function to make some serialized objects more lightweight by obviating the need to store class names as strings, but rather as integer references to registered classes. There should be no incompatibility issues presented by using several.

  4. (BGEN) This is a good reason to think about using a Hail native format, yes. Hail can compute minimal representations of variants, but cannot realign against the reference genome right now. We have a liftover function, which is only available in 0.2.

Thank you for your response.

I have tried using the information in the post as well as working with Databricks directly and we have been unable to successfully run HAIL 0.2 on Databricks due to python library incompatibilities of Databricks base image with Python 3.6. Databricks is aware of the issue and is planning on a fix eventually. This however means that as of today, only HAIL 0.1 works on DB.

I will look forward to a stable 0.2 version.

Thank you very much for the Kryo information. I look forward to trying ADAM and HAIL together in future versions.

Interestingly the index creation works without issue vs the vcf export.

At this juncture if unable/unwilling to troubleshoot this issue, we will have to explore alternatives to HAIL for our purposes. We really wish to use HAIL to leverage the Spark palatalization for conversion however we are unable to simply downgrade the Spark version or upgrade the Python version universally. ADAM is also working on implementing a BGEN reader so this may be a good resolution as well.

Thanks again for your quick responses and feedback

@yong could you provide some more background on the Python 3.6 incompatibility?

The issue reported in the post is almost certainly related to Spark 2.2, and we’re not going to invest time in changing 0.1 to work with Spark 2.2. We won’t be offended if you use other tools!

Thank you I am very interested in @yong feedback

Just an FYI, HAIL 0.1 export vcf works without issue on Spark 2.2.1 on Databricks however I suspect this is due to the custom hdfs layer and jars they use

Really excellent solution you shared. I am also in search of this kind of great answer. Thank you so much for helpful topic about hadoop. keep up the great work.