I am trying to import_vcf but I receive an error "ConnectionClosedException: Premature end of Content-Length delimited message body"

Hi! I am trying to use import_vcf to import some VCF files into Hail. My VCF files are stored in Amazon S3. I am using the s3a:// URI scheme, as you can see below. The minimal full example that fails for me is:

>>> indel_files = ['s3a://some/path/fileI1.vcf.gz', 's3a://some/path/fileI2.vcf.gz']
>>> mt1 = hl.import_vcf(indel_files, array_elements_required=False, force_bgz=True)
Hail version: 0.2.19-c6ec8b76eb26
Error summary: ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 419466954; received: 98304

I have included the complete stack trace from the Hail log file at the end of this post. What should I do?

FatalError: ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 1728230; received: 131072

Java stack trace:
java.lang.reflect.InvocationTargetException: null
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.json4s.reflect.Executable.invoke(Executable.scala:52)
	at org.json4s.Extraction$ClassInstanceBuilder.instantiate(Extraction.scala:554)
	at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:597)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
	at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
	at org.json4s.Extraction$.extract(Extraction.scala:392)
	at org.json4s.Extraction$ClassInstanceBuilder.mkWithTypeHint(Extraction.scala:587)
	at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:596)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
	at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
	at org.json4s.Extraction$.extract(Extraction.scala:392)
	at org.json4s.Extraction$.extract(Extraction.scala:39)
	at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
	at org.json4s.jackson.Serialization$.read(Serialization.scala:50)
	at org.json4s.Serialization$class.read(Serialization.scala:30)
	at org.json4s.jackson.Serialization$.read(Serialization.scala:17)
	at is.hail.expr.ir.IRParser$.deserialize(Parser.scala:146)
	at is.hail.expr.ir.IRParser$.matrix_ir_1(Parser.scala:1260)
	at is.hail.expr.ir.IRParser$.matrix_ir(Parser.scala:1197)
	at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1437)
	at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1437)
	at is.hail.expr.ir.IRParser$.parse(Parser.scala:1421)
	at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1437)
	at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1436)
	at is.hail.expr.ir.IRParser.parse_matrix_ir(Parser.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 192.168.226.22, executor 6): org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 1728230; received: 131072
	at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
	at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
	at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
	at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
	at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
	at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
	at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
	at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
	at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
	at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
	at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
	at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:199)
	at java.base/java.io.FilterInputStream.close(FilterInputStream.java:180)
	at org.apache.hadoop.io.compress.CompressionInputStream.close(CompressionInputStream.java:63)
	at is.hail.utils.package$.using(package.scala:598)
	at is.hail.io.fs.HadoopFS.readFile(HadoopFS.scala:401)
	at is.hail.io.fs.HadoopFS.readLines(HadoopFS.scala:413)
	at is.hail.io.vcf.LoadVCF$.getHeaderLines(LoadVCF.scala:1237)
	at is.hail.io.vcf.MatrixVCFReader$$anonfun$13.apply(LoadVCF.scala:1489)
	at is.hail.io.vcf.MatrixVCFReader$$anonfun$13.apply(LoadVCF.scala:1486)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:925)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.foreach(RDD.scala:925)
	at is.hail.io.vcf.MatrixVCFReader.<init>(LoadVCF.scala:1486)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at org.json4s.reflect.Executable.invoke(Executable.scala:52)
	at org.json4s.Extraction$ClassInstanceBuilder.instantiate(Extraction.scala:554)
	at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:597)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
	at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
	at org.json4s.Extraction$.extract(Extraction.scala:392)
	at org.json4s.Extraction$ClassInstanceBuilder.mkWithTypeHint(Extraction.scala:587)
	at org.json4s.Extraction$ClassInstanceBuilder.result(Extraction.scala:596)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:400)
	at org.json4s.Extraction$$anonfun$extract$6.apply(Extraction.scala:392)
	at org.json4s.Extraction$.customOrElse(Extraction.scala:606)
	at org.json4s.Extraction$.extract(Extraction.scala:392)
	at org.json4s.Extraction$.extract(Extraction.scala:39)
	at org.json4s.ExtractableJsonAstNode.extract(ExtractableJsonAstNode.scala:21)
	at org.json4s.jackson.Serialization$.read(Serialization.scala:50)
	at org.json4s.Serialization$class.read(Serialization.scala:30)
	at org.json4s.jackson.Serialization$.read(Serialization.scala:17)
	at is.hail.expr.ir.IRParser$.deserialize(Parser.scala:146)
	at is.hail.expr.ir.IRParser$.matrix_ir_1(Parser.scala:1260)
	at is.hail.expr.ir.IRParser$.matrix_ir(Parser.scala:1197)
	at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1437)
	at is.hail.expr.ir.IRParser$$anonfun$parse_matrix_ir$2.apply(Parser.scala:1437)
	at is.hail.expr.ir.IRParser$.parse(Parser.scala:1421)
	at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1437)
	at is.hail.expr.ir.IRParser$.parse_matrix_ir(Parser.scala:1436)
	at is.hail.expr.ir.IRParser.parse_matrix_ir(Parser.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 1728230; received: 131072
	at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:178)
	at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
	at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
	at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
	at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
	at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
	at java.io.FilterInputStream.close(FilterInputStream.java:180)
	at java.io.FilterInputStream.close(FilterInputStream.java:180)
	at java.io.FilterInputStream.close(FilterInputStream.java:180)
	at java.io.FilterInputStream.close(FilterInputStream.java:180)
	at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
	at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:199)
	at java.io.FilterInputStream.close(FilterInputStream.java:180)
	at org.apache.hadoop.io.compress.CompressionInputStream.close(CompressionInputStream.java:63)
	at is.hail.utils.package$.using(package.scala:598)
	at is.hail.io.fs.HadoopFS.readFile(HadoopFS.scala:401)
	at is.hail.io.fs.HadoopFS.readLines(HadoopFS.scala:413)
	at is.hail.io.vcf.LoadVCF$.getHeaderLines(LoadVCF.scala:1237)
	at is.hail.io.vcf.MatrixVCFReader$$anonfun$13.apply(LoadVCF.scala:1489)
	at is.hail.io.vcf.MatrixVCFReader$$anonfun$13.apply(LoadVCF.scala:1486)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
	at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$27.apply(RDD.scala:927)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(Thread.java:834)

I’m sorry to hear you have encountered this problem!

The short answer is that Amazon’s S3A library and Hadoop’s file system library (both of which Hail depends on to read a file stored in S3) do not properly handle reading an incomplete prefix of a file. In particular, Hail needs to read the VCF header lines from your VCF. You can fix this by creating a file containing only the VCF header lines and passing that file as the header_file parameter:

indel_files = ['s3a://some/path/fileI1.vcf.gz', 's3a://some/path/fileI2.vcf.gz']
mt1 = hl.import_vcf(indel_files,
                    header_file='s3a://some/path/just_the_vcf_header_lines.txt',
                    array_elements_required=False,
                    force_bgz=True)

Read on if you’re curious why this is happening.

Let us carefully examine the deepest part of the worker stack trace from the hail log file.

org.apache.http.ConnectionClosedException: Premature end of Content-Length delimited message body (expected: 1728230; received: 131072
    at org.apache.http.impl.io.read(ContentLengthInputStream.java:178)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:198)
    at org.apache.http.impl.io.ContentLengthInputStream.close(ContentLengthInputStream.java:101)
    at org.apache.http.conn.BasicManagedEntity.streamClosed(BasicManagedEntity.java:166)
    at org.apache.http.conn.EofSensorInputStream.checkClose(EofSensorInputStream.java:228)
    at org.apache.http.conn.EofSensorInputStream.close(EofSensorInputStream.java:172)
    at java.io.FilterInputStream.close(FilterInputStream.java:180)
    at java.io.FilterInputStream.close(FilterInputStream.java:180)
    at java.io.FilterInputStream.close(FilterInputStream.java:180)
    at java.io.FilterInputStream.close(FilterInputStream.java:180)
    at com.amazonaws.services.s3.model.S3ObjectInputStream.abort(S3ObjectInputStream.java:90)
    at org.apache.hadoop.fs.s3a.S3AInputStream.close(S3AInputStream.java:199)
    at java.io.FilterInputStream.close(FilterInputStream.java:180)
    at org.apache.hadoop.io.compress.CompressionInputStream.close(CompressionInputStream.java:63)
    at is.hail.utils.package$.using(package.scala:598)
        ...

I have remove all stack frames higher the deepest Hail frame. This frame comes from a resource management primitive called using which closes input streams when Hail is finished reading from a file. Normally this kind of operation is quick. Unfortunately, in the case that Hail is closing the input stream after reading only part of the data from the file (e.g. the header lines of a VCF), a series of unfortunate events occurs.

I cannot find the source code for the S3AInputStream that you appear to have (that line number doesn’t match up with the file in hadoop-s3a in github, it’s past the end of the file). However, the S3AInputStream relies on the Apache HTTP client libraries and the S3ObjectInputStream (which you’ll note is told to abort by S3AInputStream) contains an ominous comment on the abort method:

Aborts the underlying http request without reading any more data and closes the
stream. By default Apache HttpClient tries to reuse http connections by
reading to the end of an attached input stream on InputStream#close(). This
is efficient from a socket pool management perspective, but for objects with
large payloads can incur significant overhead while bytes are read from s3 and
discarded. It’s up to clients to decide when to take the performance hit
implicit in not reusing an http connection in order to not read unnecessary
information from S3.

The S3A client library was designed to avoid this; however, it appears to
interact poorly with the Hadoop file system library. As we follow the stacktrace
deeper, we transition from abort back to close and we eventually reach
ContentLengthInputStream.read which we definitely do not want to do. In
particular, ContentLengthInputStream.close tries to read the entire file.

I suspect that at some point S3A destroys enough of the run-time state of the
Apache HTTP client library that ContentLengthInputStream.read cannot read from
the connection. This, of course, raises an exception. The S3A libraries do not
handle this exception and neither does Hail. I do not want to change Hail to handle this
exception because it is too specific to S3A and the work-around is rather
simple: pass a header_file to import_vcf.

Thank you very much. Defining the header_file solved the issue.