Spark Tweak - to_pandas function

Hi Hail team,

I was wondering if you have a tweak to solve the following:

I pase (a big file) to the function to_pandas() and I get the following error:

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-23-e634dc0fa600> in <module>()
----> 1 pdRows=common_mt.rows().to_pandas(flatten=True)

<decorator-gen-832> in to_pandas(self, flatten)

~/hail-python.zip/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    558     def wrapper(__original_func, *args, **kwargs):
    559         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 560         return __original_func(*args_, **kwargs_)
    561 
    562     return wrapper

~/hail-python.zip/hail/table.py in to_pandas(self, flatten)
   2517 
   2518         """
-> 2519         return self.to_spark(flatten).toPandas()
   2520 
   2521     @staticmethod

/usr/lib/spark/python/pyspark/sql/dataframe.py in toPandas(self)
   1964                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1965         else:
-> 1966             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   1967 
   1968             dtype = {}

/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    464         """
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    468 

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

~/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    208             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    209                              'Hail version: %s\n'
--> 210                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    211         except pyspark.sql.utils.CapturedException as e:
    212             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Total size of serialized results of 53 tasks (1029.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

I modified the file /hail/hail/create_config_file.py and then compile hail again but I still get the same error:

'properties': {
            'spark:spark.driver.memory': '{driver_memory}g',
            'spark:spark.driver.maxResultSize': '0',
            'spark:spark.task.maxFailures': '40',
            'spark:spark.kryoserializer.buffer.max': '25g',
            'spark:spark.driver.extraJavaOptions': '-Xss8M',
            'spark:spark.executor.extraJavaOptions': '-Xss8M',
            'hdfs:dfs.replication': '1'
        }

While performing this particular operation my instances hit ~90% of CPU usage and 15 out of 120 GB of RAM.

Any help will be appreciated.

Thanks,

Carlos

That file is related to our testing / continuous integration, so modifying it won’t help! How are you running Hail? You can pass spark properties into spark-submit or pyspark or cloudtools

though I’d caution you that even if to_pandas makes it through the collection phase (memory constrained), you’ll hit a piece of spark infrastructure that is incredibly slow and painful to use. If the dataframe has mostly primitive types, you might consider mt.rows().flatten().export(...) followed by pd.read_table

1 Like

I’m in AWS EMR. Something that really speeds up the process is to do mt.key_cols_by(). I do use to_pandas(flatten=True), I’ll try mt.rows().flatten().export(...).

After modifying the create_config_file.py file I get a different error, IDK if it’s just a coincidence:

2018-09-20 16:36:36 Hail: INFO: Ordering unsorted dataset with network shuffle
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 466, in collect
    port = self._jdf.collectToPython()
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/hadoop/hail-python.zip/hail/utils/java.py", line 198, in deco
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o5968.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 

~/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    197         try:
--> 198             return f(*args, **kwargs)
    199         except py4j.protocol.Py4JJavaError as e:

/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                 "An error occurred while calling {0}{1}{2}".
--> 328                 format(target_id, ".", name))
    329     else:

Py4JError: An error occurred while calling o5968.collectToPython

During handling of the above exception, another exception occurred:

Py4JError                                 Traceback (most recent call last)
<ipython-input-15-3f57beacf9f9> in <module>()
----> 1 pdBig=mtDP1.rows().to_pandas(flatten=True)

<decorator-gen-832> in to_pandas(self, flatten)

~/hail-python.zip/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    558     def wrapper(__original_func, *args, **kwargs):
    559         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 560         return __original_func(*args_, **kwargs_)
    561 
    562     return wrapper

~/hail-python.zip/hail/table.py in to_pandas(self, flatten)
   2513 
   2514         """
-> 2515         return self.to_spark(flatten).toPandas()
   2516 
   2517     @staticmethod

/usr/lib/spark/python/pyspark/sql/dataframe.py in toPandas(self)
   1964                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1965         else:
-> 1966             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   1967 
   1968             dtype = {}

/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    464         """
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    468 

/usr/lib/spark/python/pyspark/traceback_utils.py in __exit__(self, type, value, tb)
     76         SCCallSiteSync._spark_stack_depth -= 1
     77         if SCCallSiteSync._spark_stack_depth == 0:
---> 78             self._context._jsc.setCallSite(None)

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

~/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    196         import pyspark
    197         try:
--> 198             return f(*args, **kwargs)
    199         except py4j.protocol.Py4JJavaError as e:
    200             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326             raise Py4JError(
    327                 "An error occurred while calling {0}{1}{2}".
--> 328                 format(target_id, ".", name))
    329     else:
    330         type = answer[1]

Py4JError: An error occurred while calling o3.setCallSite

this is just another kind of driver-out-of-memory issue.

Ah, we can totally fix that shuffle issue, too.

I haven’t used EMR but there must be a way to set Spark properties at job submission

also - key_cols_by() speeds up the rows().to_pandas()? that’s crazy! I would expect it to potentially make a difference for cols().to_pandas() on an old version of Hail, but something else must be happening

By Default I create my clusters with the option maximizeResourceAllocation. However IDK the details of how that option affects the spark config file. I need to look at this into more detail.

Yeah key_cols_by() only helps speeding up cols().to_pandas(), but not with rows().to_pandas.

OK, that’s actually fixed in the current version, I believe!

out of curiosity, what are you doing with pandas that you can’t do with Hail tables? I want Hail to be able to do it all.

here:
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html

You should be able to add other Spark properties inside the brackets in thisline

classification=spark-defaults,properties=[spark.executor.memory=2G]

Right, this is exactly where I found this:

Using maximizeResourceAllocation

You can configure your executors to utilize the maximum resources possible on each node in a cluster by using the spark configuration classification to set maximizeResourceAllocation option to true when you create a cluster.

So I’m assuming that executor options are set to unlimited already.

well, the memory options, probably. The max result size doesn’t seem to be covered there

Right. That’s what I’m trying to find out. And that’s why I tried to tweak the options in the create_config_file.py

that file is only used by the hail continuous integration server in its testing/deployment, you’ll want to set the stuff in the EMR config as above, I think

but totally understand why you looked there!

My sample names follow a family and member structure. I want to perform analysis (variant and statistical) by family and also by members across families. In native python I could use RegExps to solve it. But, I’m having a hard time trying to find out a good combination of hail functions to be able to do this. Any ideas/suggestions?

Thanks Tim

so like family1_mother or family5_child? Depending on what you’re trying to compute, you can totally do this in Hail!

ht = mt.cols()

split = ht.s.split("_")
ht = ht.annotate(family = split[0],
                 role = split[1])
per_family_ht = ht.group_by('family')\
    .aggregate(mean_pheno_1 = hl.agg.mean(ht.pheno_1))

per_role_ht = ht.group_by('role')\
    .aggregate(mean_pheno_1 = hl.agg.mean(ht.pheno_1))

can you give an example of the analyses you want to do?