Hail 0.2 - annotation of a table with ~10M rows fails after a few hour delay

Hello,

I want to ask you about your plans to optimize the Table.annotate() function.
The problem I ran into was running:

xmap_anno = xmap_anno.annotate(src_type = tissue_gene_anno[xmap_anno.src].gene_type, src_chr = tissue_gene_anno[xmap_anno.src].chr)

where xmap_anno has about 10M rows and tissue_gene_anno has about 17K rows.

Running this, the job hangs for hours and then fails, giving the following error:

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 20 times, most recent failure: Lost task 1.25 in stage 9.0 (TID 137, hail-w-1.c.gtex-v8.internal, executor 24): ExecutorLostFailure (executor 24 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 138440 ms
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1517)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1505)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1504)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1504)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1732)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1687)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1676)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2050)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:132)
at is.hail.rvd.OrderedRVD$.getPartitionKeyInfo(OrderedRVD.scala:478)
at is.hail.rvd.OrderedRVD$.getPartitionKeyInfo(OrderedRVD.scala:488)
at is.hail.rvd.OrderedRVD$.coerce(OrderedRVD.scala:556)
at is.hail.rvd.OrderedRVD$.coerce(OrderedRVD.scala:514)
at is.hail.rvd.OrderedRVD$.coerce(OrderedRVD.scala:495)
at is.hail.expr.TableJoin.execute(Relational.scala:1342)
at is.hail.table.Table.value$lzycompute(Table.scala:221)
at is.hail.table.Table.value(Table.scala:216)
at is.hail.table.Table.x$3$lzycompute(Table.scala:237)
at is.hail.table.Table.x$3(Table.scala:237)
at is.hail.table.Table.rvd$lzycompute(Table.scala:237)
at is.hail.table.Table.rvd(Table.scala:237)
at is.hail.table.Table.copy2$default$1(Table.scala:1162)
at is.hail.table.Table.keyBy(Table.scala:500)
at is.hail.table.Table.keyBy(Table.scala:489)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)

Hail version: devel-63d60cc
Error summary: SparkException: Job aborted due to stage failure: Task 1 in stage 9.0 failed 20 times, most recent failure: Lost task 1.25 in stage 9.0 (TID 137, hail-w-1.c.gtex-v8.internal, executor 24): ExecutorLostFailure (executor 24 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 138440 ms
Driver stacktrace:

Of course, it is possible that this error stems from the fact that I’m doing a lookup from another table. What is your current estimate for an annotation/filtering step for a table with the number of rows on the order of 10M? And do you have any plans to optimize this anytime in the near future?

Brian

I should add, the code itself runs fine when I have a test table with ~50k rows (but it is not very fast)

And do you have any plans to optimize this anytime in the near future?

Our team is working full-time on the backend right now. Things are going to get faster every day.

I think this problem might be related to partitioning, though. How many partitions do the two tables have? (table.n_partitions). It’s possible that repartitioning the smaller table to have more partitions will make this orders of magnitude faster.

I see, that would make sense - let me test this with different partition numbers.

Are you running on preemptible nodes on Google Cloud? There also could be shuffle problems.

The error above comes from a configuration with 1 master, 2 workers and 1 preemtible (32-CPU) setup. Would you recommend simply having 3 workers?

Yes. Preemption wreaks havoc on Spark’s fault-tolerance model. A shuffle puts necessary data on every node, even ephemeral ones, so if any node is lost the full shuffle must be recomputed. If the expected time to preemption is on the same order as your pipeline duration, then you may never finish!

Oh man, that is good to know, thanks! Let me try those two things and circle back to you.

Just letting you know - I tried the same procedure without a preemptible worker, and also with more partitions on the smaller table (trying 200 and 400 yielded similar results) - now annotating 40M rows with four columns with a lookup table with ~20,000 rows and then filtering takes ~ 15 minutes altogether, which is much better than before.

Thanks!