Table from pandas dataframe/aggregate problem

Hello, I’m new to hail and was just trying to run some simple things to get to know my way around, but I’m having an issue that I can’t seem to locate the source of, and I would love some help!

I have data from the UK Biobank and am running a demo gwas on chromosome 1 only, looking for associations with BMI.

First I imported the bgen file for chromosome 1 as follows, and that all seemed to work:

ef = ['GT', 'GP','dosage']
hl.index_bgen("ukb_imp_chr1_v3.bgen",contig_recoding={"01": "1"})
chr1 = hl.import_bgen("ukb_imp_chr1_v3.bgen", entry_fields=ef,sample_file="ukb52480_imp_chr1_v3_s487314.sample")
chr1.describe()

2019-11-27 14:54:31 Hail: INFO: Finished writing index file for file:/data/ukb_imp_chr1_v3.bgen
2019-11-27 14:54:31 Hail: INFO: Number of BGEN files indexed: 1
2019-11-27 14:54:32 Hail: INFO: Number of BGEN files parsed: 1
2019-11-27 14:54:32 Hail: INFO: Number of samples in BGEN files: 487409
2019-11-27 14:54:32 Hail: INFO: Number of variants across all BGEN files: 7402791

\----------------------------------------
Global fields:
    None
\----------------------------------------
Column fields:
    's': str
\----------------------------------------
Row fields:
    'locus': locus\<GRCh37>
    'alleles': array\<str>
    'rsid': str
    'varid': str
\----------------------------------------
Entry fields:
    'GT': call
    'GP': array\<float64>
    'dosage': float64
\----------------------------------------
Column key: ['s']
Row key: ['locus', 'alleles']
\----------------------------------------

I have a big csv file which has a lot of info in it, and I only wanted the sample ID and BMI columns. I wasn’t sure how to do this with hail so I used pandas read_csv with usecols to get just the two columns I wanted. Then I used hl.Table.from_pandas to convert my dataframe into a table, and .describe gives me this, which also looks right:

----------------------------------------
Global fields:
None
----------------------------------------
Row fields:
‘s’: str
‘BMI’: float64
----------------------------------------
Key: [‘s’]
----------------------------------------

And .show() gives me the following, which also looks like what I expected:

s BMI
str float64
“1000011” 3.05e+01
“1000026” 2.40e+01
“1000032” 3.01e+01
“1000044” 2.75e+01
“1000058” 2.67e+01
“1000060” 2.18e+01
“1000075” 3.03e+01
“1000083” 2.87e+01
“1000097” 2.79e+01
“1000102” 3.32e+01
“1000116” 2.63e+01
“1000121” 3.47e+01
“1000139” 2.34e+01

This is the issue. Even though it says my BMI values are floats and I can get min, max, mean etc from the pandas dataframe, when I try to use hl.agg.stats on the BMI I get NaN for everything except n. This obviously remains a problem if I try to add the BMI column to my matrix table and do linear regression.

Do you have any idea what might be going wrong here?

Thank you in advance for your help!!

Hi Chloe,

My guess is that some of your BMI values are NaN. You can check how many of them are by doing something like

no_nans = ht.filter(hl.is_nan(ht.BMI))
no_nans.count()

If you try to compute any stats on a column that has NaN in it, you’ll end up getting NaN as the result.

It looks like by default pandas skips NaN values for its min, max, and mean functions, so that would explain why it isn’t happening in pandas.

Hi John, thanks for your help. Filtering the NaN values fixed the problem and allowed me to do my dummy gwas.

Follow up question: I was trying to make a q-q plot of my p-values, again just for practice. I used hl.plot.qq and it ran for about 24 hours and then just started hanging. I thought the issue might be that there are too many data points, so I set n_divisions=10 to downsample significantly more and produce fewer points to see if this at least would produce a result. It ran for a couple of days and then gave me this error:

FatalError                                Traceback (most recent call last)
<ipython-input-12-899cbc3d1a1b> in <module>
      1 # have a go at a q-q plot with lower sampling
----> 2 p = hl.plot.qq(pvals, n_divisions=10)
      3 show(p)

</home/ch283/anaconda3/envs/hail/lib/python3.6/site-packages/decorator.py:decorator-gen-1505> in qq(pvals, label, title, xlabel, ylabel, size, legend, hover_fields, colors, width, height, collect_all, n_divisions, missing_label)

~/anaconda3/envs/hail/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

~/anaconda3/envs/hail/lib/python3.6/site-packages/hail/plot/plots.py in qq(pvals, label, title, xlabel, ylabel, size, legend, hover_fields, colors, width, height, collect_all, n_divisions, missing_label)
   1289     else:
   1290         ht = source.select_rows(pval=pvals, **hover_fields, **label).rows()
-> 1291     ht = ht.key_by().select('pval', *hover_fields, *label).key_by('pval').persist()
   1292     n = ht.count()
   1293     ht = ht.annotate(

</home/ch283/anaconda3/envs/hail/lib/python3.6/site-packages/decorator.py:decorator-gen-1007> in persist(self, storage_level)

~/anaconda3/envs/hail/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

~/anaconda3/envs/hail/lib/python3.6/site-packages/hail/table.py in persist(self, storage_level)
   1777             Persisted table.
   1778         """
-> 1779         return Env.backend().persist_table(self, storage_level)
   1780 
   1781     def unpersist(self) -> 'Table':

~/anaconda3/envs/hail/lib/python3.6/site-packages/hail/backend/backend.py in persist_table(self, t, storage_level)
    125 
    126     def persist_table(self, t, storage_level):
--> 127         return Table._from_java(self._to_java_ir(t._tir).pyPersist(storage_level))
    128 
    129     def unpersist_table(self, t):

~/anaconda3/envs/hail/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/anaconda3/envs/hail/lib/python3.6/site-packages/hail/utils/java.py in deco(*args, **kwargs)
    219             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    220                              'Hail version: %s\n'
--> 221                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    222         except pyspark.sql.utils.CapturedException as e:
    223             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Task 577 in stage 17.0 failed 1 times, most recent failure: Lost task 577.0 in stage 17.0 (TID 645, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 1265734 ms

Any idea what the issue might be?

Thank you so much for your help.
Chloe

If something takes longer than a few hours and looks like it’s not making progress, you should kill it – it’s probably in some kind of Spark death spiral.

What system are you running on?

That’s good to know, thanks, I wasn’t sure how long a normal runtime was on such a big dataset.

I’m using Ubuntu 18.04.2 LTS.

Could you possibly share the full pipeline? This will help us diagnose.

Of course, here it is:

import hail as hl
hl.init(default_reference='GRCh37')  

# import chr1 bgen as a MatrixTable
ef = ['GT', 'GP','dosage']
hl.index_bgen("ukb_imp_chr1_v3.bgen",contig_recoding={"01": "1"})
chr1 = hl.import_bgen("ukb_imp_chr1_v3.bgen", entry_fields=ef,sample_file="ukb52480_imp_chr1_v3_s487314.sample")

# read in phenotypes and participant ids to pandas dataframe
import pandas as pd
phenos = pd.read_csv("ukb37493(2).csv", usecols = ["eid","21001-0.0"],dtype={"eid":str,"21001-0.0":float})
# then convert to Hail table object
phenos.columns = ["eid","BMI"]
ph = hl.Table.from_pandas(phenos,key=["eid"])
# join phenotype table onto matrixtable
chr1 = chr1.annotate_cols(phenotype = ph[chr1.s])

# have a go at a gwas
gwas = hl.linear_regression_rows(y=chr1_qc.phenotype.BMI,
                             x=chr1_qc.GT.n_alt_alleles(),
                             covariates=[1.0])
gwas.describe()

# have a go at a q-q plot with lower sampling
pvals = gwas.p_value
p = hl.plot.qq(gwas.p_value, n_divisions=100)
show(p)

I will shortly be switching to a cluster which uses the bleeding-edge version of Ubuntu, so that might help if Ubuntu 18.04 is the problem.

Thanks again for your help.

I realised in my last response I accidentally omitted the cell where I filtered NaN phenotypes, but I did do this using:

# remove individuals with NaN BMI values
ph_filtered = ph.filter(hl.is_nan(ph.BMI),keep=False)
pprint(ph.count()-ph_filtered.count())

And then joined ph_filtered to my matrixtable.

I wondered if NaN p-values might also be the issue with the plotting, so I tried the same technique of filtering for these. Using count or collect or table.any to try to find out how many p-values are NaN results in memory errors so it’s not clear whether there actually are any NaN p-values, but I filtered them anyway and doing a manhattan plot with a very low sampling rate to try to just get something to plot, as follows:

# remove NaN p values 
gwas_filtered = gwas.filter(hl.is_nan(gwas.p_value),keep=False)

# manhattan plot
m = hl.plot.manhattan(gwas_filtered.p_value, n_divisions=10)
show(m)

And I get this error:

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-25-8cf686129d78> in <module>
      1 # manhattan plot
----> 2 m = hl.plot.manhattan(gwas_filtered.p_value, n_divisions=10)
      3 show(m)

</home/ch283/chloe/lib/python3.7/site-packages/decorator.py:decorator-gen-1567> in manhattan(pvals, locus, title, size, hover_fields, collect_all, n_divisions, significance_line)

~/chloe/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

~/chloe/lib/python3.7/site-packages/hail/plot/plots.py in manhattan(pvals, locus, title, size, hover_fields, collect_all, n_divisions, significance_line)
   1378         ('_pval', pvals),
   1379         fields=hover_fields,
-> 1380         n_divisions=None if collect_all else n_divisions
   1381     )
   1382     source_pd['p_value'] = [10 ** (-p) for p in source_pd['_pval']]

~/chloe/lib/python3.7/site-packages/hail/plot/plots.py in _collect_scatter_plot_data(x, y, fields, n_divisions, missing_label)
    715 
    716         agg_f = x[1]._aggregation_method()
--> 717         res = agg_f(hail.agg.downsample(x[1], y[1], label=list(expressions.values()) if expressions else None, n_divisions=n_divisions))
    718         source_pd = pd.DataFrame([
    719             dict(

</home/ch283/chloe/lib/python3.7/site-packages/decorator.py:decorator-gen-1047> in aggregate(self, expr, _localize)

~/chloe/lib/python3.7/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

~/chloe/lib/python3.7/site-packages/hail/table.py in aggregate(self, expr, _localize)
   1143 
   1144         if _localize:
-> 1145             return Env.backend().execute(agg_ir)
   1146         else:
   1147             return construct_expr(agg_ir, expr.dtype)

~/chloe/lib/python3.7/site-packages/hail/backend/backend.py in execute(self, ir, timed)
    107 
    108     def execute(self, ir, timed=False):
--> 109         result = json.loads(Env.hc()._jhc.backend().executeJSON(self._to_java_ir(ir)))
    110         value = ir.typ._from_json(result['value'])
    111         timings = result['timings']

~/chloe/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/chloe/lib/python3.7/site-packages/hail/utils/java.py in deco(*args, **kwargs)
    223             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    224                              'Hail version: %s\n'
--> 225                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    226         except pyspark.sql.utils.CapturedException as e:
    227             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job 12 cancelled because SparkContext was shut down

Java stack trace:
java.lang.RuntimeException: error while applying lowering 'InterpretNonCompilable'
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:26)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:18)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:18)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$apply$1.apply(CompileAndEvaluate.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.CompileAndEvaluate$.apply(CompileAndEvaluate.scala:14)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:10)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:9)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:9)
	at is.hail.backend.Backend.execute(Backend.scala:56)
	at is.hail.backend.Backend.executeJSON(Backend.scala:62)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

org.apache.spark.SparkException: Job 12 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:932)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:930)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:930)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2128)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2041)
	at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
	at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:575)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
	at scala.util.Try$.apply(Try.scala:192)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at is.hail.rvd.RVD.combine(RVD.scala:605)
	at is.hail.expr.ir.Interpret$.run(Interpret.scala:618)
	at is.hail.expr.ir.Interpret$.alreadyLowered(Interpret.scala:54)
	at is.hail.expr.ir.InterpretNonCompilable$.interpretAndCoerce$1(InterpretNonCompilable.scala:16)
	at is.hail.expr.ir.InterpretNonCompilable$.is$hail$expr$ir$InterpretNonCompilable$$rewrite$1(InterpretNonCompilable.scala:53)
	at is.hail.expr.ir.InterpretNonCompilable$.apply(InterpretNonCompilable.scala:58)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.transform(LoweringPass.scala:48)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3$$anonfun$1.apply(LoweringPass.scala:13)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:13)
	at is.hail.expr.ir.lowering.LoweringPass$$anonfun$apply$3.apply(LoweringPass.scala:11)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.lowering.LoweringPass$class.apply(LoweringPass.scala:11)
	at is.hail.expr.ir.lowering.InterpretNonCompilablePass$.apply(LoweringPass.scala:43)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:20)
	at is.hail.expr.ir.lowering.LoweringPipeline$$anonfun$apply$1.apply(LoweringPipeline.scala:18)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at is.hail.expr.ir.lowering.LoweringPipeline.apply(LoweringPipeline.scala:18)
	at is.hail.expr.ir.CompileAndEvaluate$$anonfun$apply$1.apply(CompileAndEvaluate.scala:16)
	at is.hail.utils.ExecutionTimer.time(ExecutionTimer.scala:69)
	at is.hail.expr.ir.CompileAndEvaluate$.apply(CompileAndEvaluate.scala:14)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.backend.Backend$$anonfun$execute$1.apply(Backend.scala:56)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:10)
	at is.hail.expr.ir.ExecuteContext$$anonfun$scoped$1.apply(ExecuteContext.scala:9)
	at is.hail.utils.package$.using(package.scala:596)
	at is.hail.annotations.Region$.scoped(Region.scala:18)
	at is.hail.expr.ir.ExecuteContext$.scoped(ExecuteContext.scala:9)
	at is.hail.backend.Backend.execute(Backend.scala:56)
	at is.hail.backend.Backend.executeJSON(Backend.scala:62)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)




Hail version: 0.2.30-2ae07d872f43
Error summary: SparkException: Job 12 cancelled because SparkContext was shut down

----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 57752)
Traceback (most recent call last):
  File "/usr/lib/python3.7/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.7/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.7/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.7/socketserver.py", line 720, in __init__
    self.handle()
  File "/home/ch283/chloe/lib/python3.7/site-packages/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/home/ch283/chloe/lib/python3.7/site-packages/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/home/ch283/chloe/lib/python3.7/site-packages/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/home/ch283/chloe/lib/python3.7/site-packages/pyspark/serializers.py", line 717, in read_int
    raise EOFError
EOFError
----------------------------------------

I can’t really make head or tail of this, so any insight would be appreciated! I am now using a cluster with Ubuntu 19.10.

I think this is probably an OOM error. Can you share how your spark cluster is configured? Probably the driver machine doesn’t allocate enough memory. Depending on how you’re running Spark, the default allocations can be quite low (hundreds of MB)

Sorry, I was unclear there – I am not using a Spark cluster, just running hail/spark locally, but on a shared machine with a lot of cores. I think you are right that the memory allocation is the issue though, as I get a lot of memory errors despite having 500GB of RAM. Per your advice I’ve just had a look at my local spark setup and it’s running with 1024MB for the driver, which is obviously not nearly enough.

However, I’m not sure how to change this. I followed the hail installation instructions here, and installed hail in a python virtual environment using pip, so did not manually install spark. From what I understand, when running spark locally you need to change the driver memory settings manually in the config file, and I have no idea where to find this in my installation… I’m not even sure where Spark ended up, I can only see Pyspark.

Thanks again for your help.

oh, yep, this is definitely it!!!

can set an env variable to fix this:

export PYSPARK_SUBMIT_ARGS=--driver-memory XXG pyspark-shell

And we should add a bullet for “running on a big server in local mode”

yes I suppose it’s a slightly weird situation!

when I do this it rejects the second bits as not valid identifiers ( XXG pyspark-shell) and only sets the first part.

oh, oops, this is meant to have quotes:

export PYSPARK_SUBMIT_ARGS="--driver-memory <SOME_GIGS>G pyspark-shell"

(grabbed this from env | grep PYSPARK instead of my .rc file)

This has resolved the problem and I am getting plots, hooray! Thank you for your help :slight_smile:

Is it normal for plotting in hail to be very computationally intensive when the dataset is large? My manhattan plot made all 128 cores run on 100% for about half an hour. Just wondering if this is a sign that there’s a problem in my pipeline somewhere or just to be expected with very large datasets.

Hail is built on Spark and Spark uses a lazy execution model (this is necessary because we assume the data is too large to fit all of in RAM at one time). I suspect hail did no work until you actually asked for the plot, at which point it did all the work necessary to display the plot. What operations did you perform before doing plot?

Is there a way to set the configuration of pyspark for hail somewhere in python code in a notebook? I am trying to put all code in a notebook, so you don’t have to fiddle around with config files etc. but can’t find anything in the hail.init() code to start pyspark with more memory etc. I am running into GC out of memory errors myself

Starting in 0.2.31, which I expect to be released today, it should be possible to do:

hl.init(spark_conf={'spark.driver.memory': 'XG', 'spark.executor.memory': 'XG'})
1 Like

Which of these two is more important to avoid the OOM issues I get with HAIL when analyzing large datasets? I am just running the vanilla hail on a single server, so I am sorta guessing the executor one, but gave both of them 4 GB and now my query goes well…