Yarn memory overhead

Running hail on the cloud inside a notebook I tried writing the columns of my matrix table to a csv. By running mt.cols().export(cols_csv, delimiter=',')

Error from notebook:

gs://uk-biobank/aging_18448/genotypes/processed/somatic/tranche_4_27.bcfiltered.cols.csv
2020-04-30 14:55:31 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-16-da8045357415> in <module>
      1 cols_csv = path + '.cols.csv'
      2 print(cols_csv)
----> 3 chip_vars_mt.cols().export(cols_csv, delimiter=',')

<decorator-gen-1077> in export(self, output, types_file, header, parallel, delimiter)

/opt/conda/default/lib/python3.6/site-packages/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    583     def wrapper(__original_func, *args, **kwargs):
    584         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 585         return __original_func(*args_, **kwargs_)
    586 
    587     return wrapper

/opt/conda/default/lib/python3.6/site-packages/hail/table.py in export(self, output, types_file, header, parallel, delimiter)
   1004         parallel = ExportType.default(parallel)
   1005         Env.backend().execute(
-> 1006             TableWrite(self._tir, TableTextWriter(output, types_file, header, parallel, delimiter)))
   1007 
   1008     def group_by(self, *exprs, **named_exprs) -> 'GroupedTable':

/opt/conda/default/lib/python3.6/site-packages/hail/backend/backend.py in execute(self, ir, timed)
    205     def execute(self, ir, timed=False):
    206         jir = self._to_java_ir(ir)
--> 207         result = json.loads(self._jhc.backend().executeJSON(jir))
    208         value = ir.typ._from_json(result['value'])
    209         timings = result['timings']

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/conda/default/lib/python3.6/site-packages/hail/utils/java.py in deco(*args, **kwargs)
    197         import pyspark
    198         try:
--> 199             return f(*args, **kwargs)
    200         except py4j.protocol.Py4JJavaError as e:
    201             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    334             raise Py4JError(
    335                 "An error occurred while calling {0}{1}{2}".
--> 336                 format(target_id, ".", name))
    337     else:
    338         type = answer[1]

Py4JError: An error occurred while calling o211.executeJSON

Error from task in spark:
runJob at RVD.scala:685

ExecutorLostFailure (executor 28 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 12.1 GB of 12 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead or disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714.

46 of 48 jobs were successful

spark conf that I initialize with:


sc = [('spark.jars',
  'file:///home/nicholas/miniconda3/envs/hail/lib/python3.7/site-packages/hail/hail-all-spark.jar'),
 ('spark.hadoop.io.compression.codecs',
  'org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,is.hail.io.compress.BGzipCodecTbi,org.apache.hadoop.io.compress.GzipCodec'),
 ('spark.ui.showConsoleProgress', 'false'),
 ('spark.executor.id', 'driver'),
 ('spark.logConf', 'true'),
 ('spark.kryo.registrator', 'is.hail.kryo.HailKryoRegistrator'),
 ('spark.driver.host', 'sci-pvm-nicholas.calicolabs.local'),
 ('spark.hadoop.mapreduce.input.fileinputformat.split.minsize', '134217729'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.driver.extraClassPath',
  '/home/nicholas/miniconda3/envs/hail/lib/python3.7/site-packages/hail/hail-all-spark.jar'),
 ('spark.kryoserializer.buffer.max', '1g'),
 ('spark.driver.port', '35007'),
 ('spark.driver.maxResultSize', '0'),
 ('spark.executor.extraClassPath', './hail-all-spark.jar'),
 ('spark.master', 'local[*]'),
 ('spark.repl.local.jars',
  'file:///home/nicholas/miniconda3/envs/hail/lib/python3.7/site-packages/hail/hail-all-spark.jar'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Hail'),
 ('spark.driver.memory', '120G'),
 ('spark.app.id', 'local-1585615971418'),
 ('spark.executor.heartbeatInterval', '10s'),
  ('spark.network.timeout', '10000s'),
    ('spark.yarn.executor.memoryOverhead', '32G')
     ]

Actual spark config when I look at it:

[('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.driver.extraClassPath',
  '/opt/conda/default/lib/python3.6/site-packages/hail/backend/hail-all-spark.jar,/opt/conda/default/lib/python3.6/site-packages/sparkmonitor/listener.jar'),
 ('spark.hadoop.io.compression.codecs',
  'org.apache.hadoop.io.compress.DefaultCodec,is.hail.io.compress.BGzipCodec,is.hail.io.compress.BGzipCodecTbi,org.apache.hadoop.io.compress.GzipCodec'),
 ('spark.ui.showConsoleProgress', 'false'),
 ('spark.speculation', 'true'),
 ('spark.app.id', 'application_1588179328554_0009'),
 ('spark.kryo.registrator', 'is.hail.kryo.HailKryoRegistrator'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.executor.cores', '4'),
 ('spark.repl.local.jars',
  'file:///opt/conda/default/lib/python3.6/site-packages/hail/backend/hail-all-spark.jar,file:///opt/conda/default/lib/python3.6/site-packages/sparkmonitor/listener.jar'),
 ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'exome-m'),
 ('spark.extraListeners', 'sparkmonitor.listener.JupyterSparkMonitorListener'),
 ('spark.jars',
  'file:/opt/conda/default/lib/python3.6/site-packages/hail/backend/hail-all-spark.jar,file:/opt/conda/default/lib/python3.6/site-packages/sparkmonitor/listener.jar'),
 ('spark.driver.port', '38129'),
 ('spark.eventLog.dir', 'hdfs://exome-m/user/spark/eventlog'),
 ('spark.executor.instances', '2'),
 ('spark.driver.maxResultSize', '0'),
 ('spark.executor.extraClassPath', './hail-all-spark.jar'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.name', 'Hail'),
 ('spark.executor.extraJavaOptions', '-Xss4M'),
 ('spark.driver.host', 'exome-m.c.calico-uk-biobank.internal'),
 ('spark.yarn.historyServer.address', 'exome-m:18080'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.driver.memory', '41g'),
 ('spark.yarn.dist.jars',
  'file:///opt/conda/default/lib/python3.6/site-packages/hail/backend/hail-all-spark.jar,file:///opt/conda/default/lib/python3.6/site-packages/sparkmonitor/listener.jar'),
 ('spark.driver.appUIAddress',
  'http://exome-m.c.calico-uk-biobank.internal:4040'),
 ('spark.hadoop.mapreduce.input.fileinputformat.split.minsize', '0'),
 ('spark.shuffle.service.enabled', 'true'),
 ('spark.driver.extraJavaOptions', '-Xss4M'),
 ('spark.scheduler.mode', 'FAIR'),
 ('spark.hadoop.hive.execution.engine', 'mr'),
 ('spark.yarn.jars', 'local:/usr/lib/spark/jars/*'),
 ('spark.history.fs.logDirectory', 'hdfs://exome-m/user/spark/eventlog'),
 ('spark.scheduler.minRegisteredResourcesRatio', '0.0'),
 ('spark.executor.id', 'driver'),
 ('spark.logConf', 'true'),
 ('spark.yarn.secondary.jars', 'hail-all-spark.jar,listener.jar'),
 ('spark.executor.memory', '11171m'),
 ('spark.dynamicAllocation.maxExecutors', '10000'),
 ('spark.master', 'yarn'),
 ('spark.kryoserializer.buffer.max', '1g'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://exome-m:8088/proxy/application_1588179328554_0009'),
 ('spark.rpc.message.maxSize', '512'),
 ('spark.executorEnv.PYTHONHASHSEED', '0'),
 ('spark.executorEnv.PYTHONPATH',
  '/usr/lib/spark/python/lib/pyspark.zip:/usr/lib/spark/python/lib/py4j-0.10.7-src.zip<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.ui.proxyBase', '/proxy/application_1588179328554_0008'),
 ('spark.executorEnv.OPENBLAS_NUM_THREADS', '1'),
 ('spark.yarn.isPython', 'true'),
 ('spark.task.maxFailures', '20'),
 ('spark.sql.parquet.cacheMetadata', 'false'),
 ('spark.dynamicAllocation.enabled', 'true'),
 ('spark.sql.cbo.enabled', 'true')]

Any thoughts about what else I can do?

how many cols do you have and how large is the schema of the table you’re exporting?

shape: (1164, 49396)

The cols schema is a bit weird:

--------------------------------------------------------
Type:
        struct {
        s: str, 
        mutation_count_A_T: int64, 
        mutation_count_A_C: int64, 
        mutation_count_A_G: int64, 
        mutation_count_T_A: int64, 
        mutation_count_T_C: int64, 
        mutation_count_T_G: int64, 
        mutation_count_C_A: int64, 
        mutation_count_C_T: int64, 
        mutation_count_C_G: int64, 
        mutation_count_G_A: int64, 
        mutation_count_G_T: int64, 
        mutation_count_G_C: int64, 
        mutation_count_missense_variant: int64, 
        `mutation_count_splice_region_variant&synonymous_variant`: int64, 
        mutation_count_frameshift_variant: int64, 
        `mutation_count_splice_region_variant&non_coding_transcript_exon_variant`: int64, 
        mutation_count_stop_gained: int64, 
        loci_depth: array<int32>, 
        allele_depth: array<int32>, 
        loci_contig: array<str>, 
        loci_position: array<int32>, 
        gene_name_with_mutation: array<str>, 
        variant_types: array<str>, 
        exon_number_col: array<str>, 
        amino_acid_change_col: array<str>, 
        amino_acid_number_col: array<str>, 
        alt_allele: array<str>, 
        ref_allele: array<str>, 
        mutation_count: int64, 
        mean_clone_size_by_col: float64, 
        gene: float64, 
        chip_mutation_count: int64, 
        cd_chip_gene_name_with_mutation: array<str>
    }
--------------------------------------------------------
Source:
    <hail.matrixtable.MatrixTable object at 0x7f8b3385fe48>
Index:
    ['column']
--------------------------------------------------------

I suspect this is related to putting too much data into memory at one time. How large are those arrays?

Hi @njbernstein njbernstein

I wonder about command you haw used to coll for “Actual spark config”?
Actually i have similar problem. I run Haill on my local server. But it run extremely slow. Currently i haw problem to deal with 500 gVCF, for example i can not run PCA on it.
Could you please suggest spark settings i have to adjust to improve the performance?