PCA job aborted from SparkException

Was able to boost the physical memory some by setting “‘spark.executor.memory’:‘18g’”, which is about as high as I can get it even though my worker node memory is set to 26G. I am still getting a SparkException, though the error message now says I am using 20GB of 20GB physical memory (see below).

Is there anything I can do to get this to run? Perhaps ~600k variants is still too memory intensive but I can’t really take any action to prune this table further since ld_prune() is also quite memory intensive.

I appreciate of of your help on this!

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-25-e3aac58407be> in <module>
      1 # Compute PCA
----> 2 eigenvalues, pcs, _ = hl.hwe_normalized_pca(gen_filt.GT, k=10, compute_loadings=False)
      3 #eigenvalues, pcs, _ = hl.hwe_normalized_pca(gen_prune.GT, k=10, compute_loadings=False)

<decorator-gen-1549> in hwe_normalized_pca(call_expr, k, compute_loadings)

/usr/local/lib/python3.7/dist-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    612     def wrapper(__original_func, *args, **kwargs):
    613         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 614         return __original_func(*args_, **kwargs_)
    615 
    616     return wrapper

/usr/local/lib/python3.7/dist-packages/hail/methods/statgen.py in hwe_normalized_pca(call_expr, k, compute_loadings)
   1593     return pca(normalized_gt,
   1594                k,
-> 1595                compute_loadings)
   1596 
   1597 

<decorator-gen-1551> in pca(entry_expr, k, compute_loadings)

/usr/local/lib/python3.7/dist-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    612     def wrapper(__original_func, *args, **kwargs):
    613         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 614         return __original_func(*args_, **kwargs_)
    615 
    616     return wrapper

/usr/local/lib/python3.7/dist-packages/hail/methods/statgen.py in pca(entry_expr, k, compute_loadings)
   1695         'entryField': field,
   1696         'k': k,
-> 1697         'computeLoadings': compute_loadings
   1698     })).persist())
   1699 

<decorator-gen-1095> in persist(self, storage_level)

/usr/local/lib/python3.7/dist-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    612     def wrapper(__original_func, *args, **kwargs):
    613         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 614         return __original_func(*args_, **kwargs_)
    615 
    616     return wrapper

/usr/local/lib/python3.7/dist-packages/hail/table.py in persist(self, storage_level)
   1834             Persisted table.
   1835         """
-> 1836         return Env.backend().persist_table(self, storage_level)
   1837 
   1838     def unpersist(self) -> 'Table':

/usr/local/lib/python3.7/dist-packages/hail/backend/spark_backend.py in persist_table(self, t, storage_level)
    313 
    314     def persist_table(self, t, storage_level):
--> 315         return Table._from_java(self._jbackend.pyPersistTable(storage_level, self._to_java_table_ir(t._tir)))
    316 
    317     def unpersist_table(self, t):

/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/local/lib/python3.7/dist-packages/hail/backend/spark_backend.py in deco(*args, **kwargs)
     39             raise FatalError('%s\n\nJava stack trace:\n%s\n'
     40                              'Hail version: %s\n'
---> 41                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
     42         except pyspark.sql.utils.CapturedException as e:
     43             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Task 10 in stage 3.0 failed 4 times, most recent failure: Lost task 10.3 in stage 3.0 (TID 90, saturn-3aae50e8-e65e-4fe4-b240-b4d8c7ad9956-w-18.c.mycompany-research-and-development.internal, executor 22): ExecutorLostFailure (executor 22 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  20.0 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Driver stacktrace:

Java stack trace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 3.0 failed 4 times, most recent failure: Lost task 10.3 in stage 3.0 (TID 90, saturn-3aae50e8-e65e-4fe4-b240-b4d8c7ad9956-w-18.c.mycompany-research-and-development.internal, executor 22): ExecutorLostFailure (executor 22 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  20.0 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at is.hail.sparkextras.ContextRDD.collect(ContextRDD.scala:166)
	at is.hail.rvd.RVD.countPerPartition(RVD.scala:744)
	at is.hail.expr.ir.MatrixValue.toRowMatrix(MatrixValue.scala:241)
	at is.hail.methods.PCA.execute(PCA.scala:33)
	at is.hail.expr.ir.functions.WrappedMatrixToTableFunction.execute(RelationalFunctions.scala:49)
	at is.hail.expr.ir.TableToTableApply.execute(TableIR.scala:2409)
	at is.hail.expr.ir.Interpret$.apply(Interpret.scala:23)
	at is.hail.backend.spark.SparkBackend$$anonfun$pyPersistTable$1.apply(SparkBackend.scala:402)
	at is.hail.backend.spark.SparkBackend$$anonfun$pyPersistTable$1.apply(SparkBackend.scala:401)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:20)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:18)
	at is.hail.utils.package$.using(package.scala:601)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:18)
	at is.hail.backend.spark.SparkBackend.withExecuteContext(SparkBackend.scala:229)
	at is.hail.backend.spark.SparkBackend.pyPersistTable(SparkBackend.scala:401)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)



Hail version: 0.2.45-a45a43f21e83
Error summary: SparkException: Job aborted due to stage failure: Task 10 in stage 3.0 failed 4 times, most recent failure: Lost task 10.3 in stage 3.0 (TID 90, saturn-3aae50e8-e65e-4fe4-b240-b4d8c7ad9956-w-18.c.mycompany-research-and-development.internal, executor 22): ExecutorLostFailure (executor 22 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits.  20.0 GB of 20 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.
Driver stacktrace:

Ah, I’m 99% sure this is fixed by https://github.com/hail-is/hail/pull/9009

We haven’t released since then, though. I’ll try to create a release this afternoon.

awesome!!

Is this a bugfix that specifically applies to the PCA function, or the prune function, or memory usage across the board?

This fixes a memory leak in infrastructure that was used in a few places – exportBGEN, ld_prune, PCA, and a couple other things that use block matrix / linear algebra.

Quite a bad bug :sweat:

1 Like

ah, we need to fix something else before we release. I’ll post updates here.

Sounds good, let me know when it is released!

The offices are closed today and we’re having a few issues with our CI system, so I think tomorrow morning is a good bet for a release.

Ah, I thought the change that fixed this bug was stacked on top of the current release, but it was actually in 0.2.47! I’m drafting the next release now, but you should be able to get going on 0.2.47.

So now I am using Hail 0.2.49 and it seems like hl.hwe_normalized_pca() may be working now but I have a question about run time.

Previously before the memory leaks were patched, the function would terminate soon after displaying “2020-07-14 01:36:49 Hail: INFO: hwe_normalized_pca: running PCA using 559712 variants.”

Now it is progressing to the next step “2020-07-14 03:09:56 Hail: INFO: pca: running PCA with 10 components…”

The issue is the runtime. So far it has been running over 24 hours. I started this run around 7pm on Monday 7/13. I am using a Terra notebook with 53 nodes. I debated stopping the run last night but the Jupyter kernel still seems to be reactive.

Is this sort of runtime normal for 39628 samples and 581323 variants or do you think I should terminate and attempt to rerun?

Is there a way to run this function with more verbosity?

Really nothing except the most massive (100s of TBs of data processed) pipelines should take a day on clusters that size.

PCA is quite sensitive to partitioning, though. The best place to get more information about this is either the hail log, or the Spark UI. Do you have the Hail log file? It will probably be big, but that’s the thing to look at first.

How would I get the Hail log file?

You’re running this in a notebook in terra, right? You can let the thing run for a few minutes, then interrupt that cell, and run:

hl.upload_log('gs://path/to/some/bucket/you/own')

Yep, thank you! I am running some other analyses right now but I’ll post a log tomorrow if I still have issues upon rerunning.

@tpoterba it looks like upload_log was removed from the codebase

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-39-93dcd1342068> in <module>
----> 1 hl.upload_log(bucket+"/pca_run.log")

AttributeError: module 'hail' has no attribute 'upload_log'

I found this issue that was closed in April of this year: https://github.com/hail-is/hail/issues/7392

Notably, since using 0.2.49 I was successfully able to run hl.ld_prune() (runtime was approx 4hrs), which brought my variant row count down from 580k to 340k with r2=0.2. Previously this function produced a SparkException so we are getting somewhere!

Even with another overnight run, the hl.hwe_normalized_pca() looks like it is a stuck job.

Happy to generate a log file for this if there is a way.

Is the logging from the hl.init() statement the same log? Looks like I can access that from the terminal of the Terra virtual machine and it looks quite large now.

Oops, sorry, got confused – hl.copy_log is what I meant to point you to. That’s a convenience around using another utility to copy the log from the local disk of the driver machine to a Google bucket / remote FS.

Yeah, the log on that machine is what we want – I do expect it to be somewhat large.

Should I post in here?

Probably won’t fit. Can you email as attachment/drive link to hail-team@broadinstitute.org?

1 Like

For anyone else following, wanted to post an update on this. I finally was able to run a PCA job after some help from Tim.

We found that my MatrixTable, assembled via import_plink() was not partitioned very efficiently ( you can check this via mt.n_partitions().

We were able to both repartition the data and save/checkpoint as a MatrixTable for added efficiency with

gen = gen.repartition(250).checkpoint('gs://some/[path.mt](http://path.mt)', overwrite=True)

I did have some trouble with the shuffling step of the repartition due to using preemptible nodes, so Tim provided me with the following code for a “no shuffle” repartition:

def no_shuffle_repartition(mt, path1, path2, n_parts):
     mt = mt.checkpoint(path1)
     return hl.read_matrix_table(path1, _intervals=mt._calculate_new_partitions(n_parts)).checkpoint(path2)
```