Union of columns

Hi,

I am running into some issues when trying to create the union of two large matrix table over columns.
I have multiple matrix tables exported from VCF and I wish to create a unique matrix table.
so I use mtA.union_cols(mt_B, 'outer') like that I will have the outer join of my variants as rows, and all my samples appends as columns.

I encounter issues with resources… When I try to write the resulting matrix tale, I get the error:

Hail version: 0.2.32-a5876a0a2853
Error summary: ConnectionPoolTimeoutException: Timeout waiting for connection from pool

It seams mostly due to lack of RAM. By extending the RAM of my cluster, I was able to merge more samples… but now I reach a limit where I am not able to add more RAM on my nodes, and I still get the timeout error…

Do I miss something ? Is there a way I could formulate my code to make it work without reaching the RAM limit ? I am not sure of what happen under the hood of HAIL and union_cols…
In my previous tests, more RAM does help (but I reach the limit on my nodes) more CPU or disk space seams not helping… should I repartition my matrix table ? I never try to repartiction, not sure what it impact…

Thanks

Hey @mhebrard,

Sorry you’re running into this issue. We need more information before we can help. Can you please share:

  • the hail log file from a failing run,
  • the full command you used to start the cluster,
  • the number of rows and columns in each matrix table,
  • how many partitions are in each matrix table, and
  • as close to the full script as possible?

union_cols is very sensitive to dataset partitioning. We have some very preliminary work to make it more resilient. @chrisvittal can provide more context on how/if that is helpful in your case.

Hi @danking

here some additional infos:

I run HAIL v0.2.32 on an AWS EMR cluster created with Cloudformation. the recipe can be found there

For this union task, I create a cluster of 1 MASTER + 2 CORES using instance type r5.24xlarge, that is 96 CPUs and 768 Gb RAM per node.

I run the code througth Zeppelin notebook as follow

# on AWS EMR cluster - 1 MASTER + 2 CORES - r5.24xlarge (96 CPU - 768Gb RAM) - 500GB Disk space
# Import hail
import hail as hl
hl.init(sc)
# Running on Apache Spark version 2.4.4
# SparkUI available at http://ip-172-31-2-192.ap-southeast-1.compute.internal:4040
# Welcome to
#      __  __     <>__
#     / /_/ /__  __/ /
#    / __  / _ `/ / /
#   /_/ /_/\_,_/_/_/   version 0.2.32-a5876a0a2853
# LOGGING: writing to /mnt/var/lib/zeppelin/hail-20200226-0207-0.2.32-a5876a0a2853.log

# Load left mt
mt_left = hl.read_matrix_table('s3://<path>/n7337.vcf.mt')
# 7,337 cols x 157,563,374 rows in 29,504 partitions

# Load right mt
mt_right = hl.read_matrix_table('s3://<path>/n2986.vcf.mt')
# 2986 cols x 94,911,949 rows in 15,502 partitions

# Union
mt_union = mt_left.union_cols(mt_right, 'outer')

# Write Union mt
mt_union.write('s3://<path>/n10323.vcf.mt', overwrite=True)

# Took 3 min 19 sec.
# Fail to execute line 8: mt_union.write("s3://npmchorus-gnomad/hailoutput/SG10K_Health_maxi.n10323.jc.VQSR-pass-only.split-multiallelic.vcf.mt", overwrite=True)
# Traceback (most recent call last):
#   File "/tmp/zeppelin_pyspark-7521024825194216637.py", line 380, in <module>
#     exec(code, _zcUserQueryNameSpace)
#   File "<stdin>", line 8, in <module>
#   File "</usr/local/lib/python3.6/site-packages/decorator.py:decorator-gen-1060>", line 2, in write
#   File "/opt/hail/python/hail/typecheck/check.py", line 585, in wrapper
#     return __original_func(*args_, **kwargs_)
#   File "/opt/hail/python/hail/matrixtable.py", line 2522, in write
#     Env.backend().execute(MatrixWrite(self._mir, writer))
#   File "/opt/hail/python/hail/backend/backend.py", line 109, in execute
#     result = json.loads(Env.hc()._jhc.backend().executeJSON(self._to_java_ir(ir)))
#   File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
#     answer, self.gateway_client, self.target_id, self.name)
#   File "/opt/hail/python/hail/utils/java.py", line 225, in deco
#     'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
# hail.utils.java.FatalError: ConnectionPoolTimeoutException: Timeout waiting for connection from pool

here the full hail log:
hail-20200226-0207-0.2.32-a5876a0a2853.log (1.4 MB)

Thanks

I do not know if the following is related to your problem but it appears in the log. You might want to verify this is not important.

2020-02-26 02:09:49 UserData: ERROR: Error encountered while try to get user data
java.io.IOException: File '/var/aws/emr/userData.json' cannot be read
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.commons.io.FileUtils.openInputStream(FileUtils.java:296)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1711)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.commons.io.FileUtils.readFileToString(FileUtils.java:1748)
	at com.amazon.ws.emr.hadoop.fs.util.UserData.getUserData(UserData.java:62)
	at com.amazon.ws.emr.hadoop.fs.util.UserData.<init>(UserData.java:39)
	at com.amazon.ws.emr.hadoop.fs.util.UserData.ofDefaultResourceLocations(UserData.java:52)
	at com.amazon.ws.emr.hadoop.fs.util.AWSSessionCredentialsProviderFactory.buildSTSClient(AWSSessionCredentialsProviderFactory.java:52)
	at com.amazon.ws.emr.hadoop.fs.util.AWSSessionCredentialsProviderFactory.<clinit>(AWSSessionCredentialsProviderFactory.java:17)
	at com.amazon.ws.emr.hadoop.fs.rolemapping.DefaultS3CredentialsResolver.resolve(DefaultS3CredentialsResolver.java:22)
	at com.amazon.ws.emr.hadoop.fs.guice.CredentialsProviderOverrider.override(CredentialsProviderOverrider.java:25)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.executeOverriders(GlobalS3Executor.java:163)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:102)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:189)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:43)
	at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:214)
	at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:780)
	at org.apache.hadoop.fs.FileSystem.isDirectory(FileSystem.java:1453)
	at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.isDirectory(EmrFileSystem.java:357)
	at is.hail.io.fs.HadoopFS.isDir(HadoopFS.scala:179)
	at is.hail.expr.ir.RelationalSpec$.readMetadata(AbstractMatrixTableSpec.scala:30)
	at is.hail.expr.ir.RelationalSpec$.readReferences(AbstractMatrixTableSpec.scala:62)
	at is.hail.variant.ReferenceGenome$.fromHailDataset(ReferenceGenome.scala:586)
	at is.hail.variant.ReferenceGenome.fromHailDataset(ReferenceGenome.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

Something indeed seems wrong with S3. Later in the log you experience over one hundred of these errors:

2020-02-26 02:11:43 TaskSetManager: WARN: Lost task 84.0 in stage 0.0 (TID 84, ip-172-31-10-171.ap-southeast-1.compute.internal, executor 3): com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1175)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1121)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:770)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:744)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:726)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:686)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:668)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:532)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:512)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4926)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4872)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:390)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:5806)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1794)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1754)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:33)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.call.PutObjectCall.performCall(PutObjectCall.java:8)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.call.AbstractUploadingS3Call.perform(AbstractUploadingS3Call.java:61)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:109)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:189)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
	at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.putObject(AmazonS3LiteClient.java:107)
	at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.storeFile(Jets3tNativeFileSystemStore.java:168)
	at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.uploadSinglePart(MultipartUploadOutputStream.java:208)
	at com.amazon.ws.emr.hadoop.fs.s3n.MultipartUploadOutputStream.close(MultipartUploadOutputStream.java:423)
	at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:74)
	at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:108)
	at is.hail.utils.richUtils.ByteTrackingOutputStream.close(ByteTrackingOutputStream.scala:23)
	at is.hail.io.StreamBlockOutputBuffer.close(OutputBuffers.scala:264)
	at is.hail.io.LZ4OutputBlockBuffer.close(OutputBuffers.scala:280)
	at is.hail.io.BlockingOutputBuffer.close(OutputBuffers.scala:182)
	at is.hail.io.LEB128OutputBuffer.close(OutputBuffers.scala:121)
	at is.hail.io.CompiledEncoder.close(Encoder.scala:27)
	at is.hail.utils.package$.using(package.scala:598)
	at is.hail.io.RichContextRDDRegionValue$$anonfun$2.apply(RichContextRDDRegionValue.scala:97)
	at is.hail.io.RichContextRDDRegionValue$$anonfun$2.apply(RichContextRDDRegionValue.scala:95)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.io.fs.HadoopFS.writeFile(HadoopFS.scala:406)
	at is.hail.io.RichContextRDDRegionValue$.writeSplitRegion(RichContextRDDRegionValue.scala:95)
	at is.hail.rvd.RVD$$anonfun$25.apply(RVD.scala:901)
	at is.hail.rvd.RVD$$anonfun$25.apply(RVD.scala:899)
	at is.hail.sparkextras.ContextRDD$$anonfun$cmapPartitionsWithIndex$1$$anonfun$apply$16.apply(ContextRDD.scala:281)
	at is.hail.sparkextras.ContextRDD$$anonfun$cmapPartitionsWithIndex$1$$anonfun$apply$16.apply(ContextRDD.scala:281)
	at is.hail.sparkextras.ContextRDD$$anonfun$run$1$$anonfun$apply$8.apply(ContextRDD.scala:219)
	at is.hail.sparkextras.ContextRDD$$anonfun$run$1$$anonfun$apply$8.apply(ContextRDD.scala:219)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at scala.collection.AbstractIterator.to(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$13.apply(RDD.scala:945)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.conn.ConnectionPoolTimeoutException: Timeout waiting for connection from pool
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.leaseConnection(PoolingHttpClientConnectionManager.java:314)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager$1.get(PoolingHttpClientConnectionManager.java:280)
	at sun.reflect.GeneratedMethodAccessor1993.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.ClientConnectionRequestFactory$Handler.invoke(ClientConnectionRequestFactory.java:70)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.conn.$Proxy36.get(Unknown Source)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:190)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
	at com.amazon.ws.emr.hadoop.fs.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:56)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.apache.client.impl.SdkHttpClient.execute(SdkHttpClient.java:72)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1297)
	at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1113)
	... 68 more

Some internet searching suggest you should modify fs.s3a.connection.maximum.


Thanks @danking,

I looked into the max connection story, and it actually solve my issue !

for reference I added the following configuration into the cluster recipe:

Configurations:
  - Classification: emrfs-site
    ConfigurationProperties:
      fs.s3.maxConnections: 100

I stay on fs.s3 for now (no use of fs.s3a)

1 Like