Dear all,
We are migrating from HDFS to S3 and our previous pipeline is now raising the following error:
Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 4.0 failed 4 times, most recent failure: Lost task 2.3 in stage 4.0 (TID 29) (10.0.99.51 executor 0): java.io.InterruptedIOException: getFileStatus on s3a://.../XXXX.vcf.gz.tbi: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
[...]
Caused by: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
[...]
Caused by: com.amazonaws.thirdparty.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
[...]
After some tests, I am able to load 3-4 gVCF with our pipeline but loading more of them at the same time or iteratively it raises the same error.
The setting is:
spark_conf = SparkConf().setAppName('genetic-pipeline')
spark = SparkSession.builder.config(conf = spark_conf).getOrCreate()
spark.sparkContext._jsc.hadoopConfiguration().setInt('dfs.block.size', 536870912)
spark.sparkContext._jsc.hadoopConfiguration().setInt('parquet.block.size', 536870912)
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.secret.key', 'ABC')
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.access.key', 'ABC')
spark.sparkContext._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'X.X.X.X:YYYY')
spark.sparkContext._jsc.hadoopConfiguration().set('spark.python.worker.reuse', 'true')
The code I am using follows (intervals
and the list of experiments are global):
def transformFile(mt):
return transform_gvcf(mt.annotate_rows(
info = mt.info.annotate(MQ_DP = hl.missing(hl.tint32), VarDP = hl.missing(hl.tint32), QUALapprox = hl.missing(hl.tint32))
))
def importFiles(files):
return hl.import_vcfs(
files,
partitions = interval[ 'interval' ],
reference_genome = interval[ 'reference_genome' ],
array_elements_required = interval[ 'array_elements_required' ]
)
vcfs = [ transformFile(mt) for mt in importFiles(experiments_1) ] # First pack 3 gVCF
comb1 = combine_gvcfs(vcfs)
comb1.write("s3a://.../test1", overwrite = True)
vcfs = [ transformFile(mt) for mt in importFiles(experiments_2 ) ] # Second pack of 3 gVCF
comb2 = combine_gvcfs(vcfs)
comb2.write("s3a://.../test2", overwrite = True) # <-- in stops here raising the error at import_vcfs
comb = combine_gvcfs([comb1, comb2])
comb.write("s3a://.../test", overwrite = True)
Any help elucidating why the connections are not re-used and or not terminated is welcome.