Spark Tweak - to_pandas function


#1

Hi Hail team,

I was wondering if you have a tweak to solve the following:

I pase (a big file) to the function to_pandas() and I get the following error:

---------------------------------------------------------------------------
FatalError                                Traceback (most recent call last)
<ipython-input-23-e634dc0fa600> in <module>()
----> 1 pdRows=common_mt.rows().to_pandas(flatten=True)

<decorator-gen-832> in to_pandas(self, flatten)

~/hail-python.zip/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    558     def wrapper(__original_func, *args, **kwargs):
    559         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 560         return __original_func(*args_, **kwargs_)
    561 
    562     return wrapper

~/hail-python.zip/hail/table.py in to_pandas(self, flatten)
   2517 
   2518         """
-> 2519         return self.to_spark(flatten).toPandas()
   2520 
   2521     @staticmethod

/usr/lib/spark/python/pyspark/sql/dataframe.py in toPandas(self)
   1964                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1965         else:
-> 1966             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   1967 
   1968             dtype = {}

/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    464         """
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    468 

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

~/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    208             raise FatalError('%s\n\nJava stack trace:\n%s\n'
    209                              'Hail version: %s\n'
--> 210                              'Error summary: %s' % (deepest, full, hail.__version__, deepest)) from None
    211         except pyspark.sql.utils.CapturedException as e:
    212             raise FatalError('%s\n\nJava stack trace:\n%s\n'

FatalError: SparkException: Job aborted due to stage failure: Total size of serialized results of 53 tasks (1029.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

I modified the file /hail/hail/create_config_file.py and then compile hail again but I still get the same error:

'properties': {
            'spark:spark.driver.memory': '{driver_memory}g',
            'spark:spark.driver.maxResultSize': '0',
            'spark:spark.task.maxFailures': '40',
            'spark:spark.kryoserializer.buffer.max': '25g',
            'spark:spark.driver.extraJavaOptions': '-Xss8M',
            'spark:spark.executor.extraJavaOptions': '-Xss8M',
            'hdfs:dfs.replication': '1'
        }

While performing this particular operation my instances hit ~90% of CPU usage and 15 out of 120 GB of RAM.

Any help will be appreciated.

Thanks,

Carlos


#2

That file is related to our testing / continuous integration, so modifying it won’t help! How are you running Hail? You can pass spark properties into spark-submit or pyspark or cloudtools


#3

though I’d caution you that even if to_pandas makes it through the collection phase (memory constrained), you’ll hit a piece of spark infrastructure that is incredibly slow and painful to use. If the dataframe has mostly primitive types, you might consider mt.rows().flatten().export(...) followed by pd.read_table


#4

I’m in AWS EMR. Something that really speeds up the process is to do mt.key_cols_by(). I do use to_pandas(flatten=True), I’ll try mt.rows().flatten().export(...).

After modifying the create_config_file.py file I get a different error, IDK if it’s just a coincidence:

2018-09-20 16:36:36 Hail: INFO: Ordering unsorted dataset with network shuffle
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/sql/dataframe.py", line 466, in collect
    port = self._jdf.collectToPython()
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/hadoop/hail-python.zip/hail/utils/java.py", line 198, in deco
    return f(*args, **kwargs)
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name))
py4j.protocol.Py4JError: An error occurred while calling o5968.collectToPython

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1062, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 908, in send_command
    response = connection.send_command(command)
  File "/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py", line 1067, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 

~/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    197         try:
--> 198             return f(*args, **kwargs)
    199         except py4j.protocol.Py4JJavaError as e:

/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                 "An error occurred while calling {0}{1}{2}".
--> 328                 format(target_id, ".", name))
    329     else:

Py4JError: An error occurred while calling o5968.collectToPython

During handling of the above exception, another exception occurred:

Py4JError                                 Traceback (most recent call last)
<ipython-input-15-3f57beacf9f9> in <module>()
----> 1 pdBig=mtDP1.rows().to_pandas(flatten=True)

<decorator-gen-832> in to_pandas(self, flatten)

~/hail-python.zip/hail/typecheck/check.py in wrapper(__original_func, *args, **kwargs)
    558     def wrapper(__original_func, *args, **kwargs):
    559         args_, kwargs_ = check_all(__original_func, args, kwargs, checkers, is_method=is_method)
--> 560         return __original_func(*args_, **kwargs_)
    561 
    562     return wrapper

~/hail-python.zip/hail/table.py in to_pandas(self, flatten)
   2513 
   2514         """
-> 2515         return self.to_spark(flatten).toPandas()
   2516 
   2517     @staticmethod

/usr/lib/spark/python/pyspark/sql/dataframe.py in toPandas(self)
   1964                 raise RuntimeError("%s\n%s" % (_exception_message(e), msg))
   1965         else:
-> 1966             pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
   1967 
   1968             dtype = {}

/usr/lib/spark/python/pyspark/sql/dataframe.py in collect(self)
    464         """
    465         with SCCallSiteSync(self._sc) as css:
--> 466             port = self._jdf.collectToPython()
    467         return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
    468 

/usr/lib/spark/python/pyspark/traceback_utils.py in __exit__(self, type, value, tb)
     76         SCCallSiteSync._spark_stack_depth -= 1
     77         if SCCallSiteSync._spark_stack_depth == 0:
---> 78             self._context._jsc.setCallSite(None)

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

~/hail-python.zip/hail/utils/java.py in deco(*args, **kwargs)
    196         import pyspark
    197         try:
--> 198             return f(*args, **kwargs)
    199         except py4j.protocol.Py4JJavaError as e:
    200             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326             raise Py4JError(
    327                 "An error occurred while calling {0}{1}{2}".
--> 328                 format(target_id, ".", name))
    329     else:
    330         type = answer[1]

Py4JError: An error occurred while calling o3.setCallSite

#5

this is just another kind of driver-out-of-memory issue.


#6

Ah, we can totally fix that shuffle issue, too.


#7

I haven’t used EMR but there must be a way to set Spark properties at job submission


#8

also - key_cols_by() speeds up the rows().to_pandas()? that’s crazy! I would expect it to potentially make a difference for cols().to_pandas() on an old version of Hail, but something else must be happening


#9

By Default I create my clusters with the option maximizeResourceAllocation. However IDK the details of how that option affects the spark config file. I need to look at this into more detail.

Yeah key_cols_by() only helps speeding up cols().to_pandas(), but not with rows().to_pandas.


#10

OK, that’s actually fixed in the current version, I believe!


#11

out of curiosity, what are you doing with pandas that you can’t do with Hail tables? I want Hail to be able to do it all.


#12

here:
https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html

You should be able to add other Spark properties inside the brackets in thisline

classification=spark-defaults,properties=[spark.executor.memory=2G]

#13

Right, this is exactly where I found this:

Using maximizeResourceAllocation

You can configure your executors to utilize the maximum resources possible on each node in a cluster by using the spark configuration classification to set maximizeResourceAllocation option to true when you create a cluster.

So I’m assuming that executor options are set to unlimited already.


#14

well, the memory options, probably. The max result size doesn’t seem to be covered there


#15

Right. That’s what I’m trying to find out. And that’s why I tried to tweak the options in the create_config_file.py


#16

that file is only used by the hail continuous integration server in its testing/deployment, you’ll want to set the stuff in the EMR config as above, I think


#17

but totally understand why you looked there!


#18

My sample names follow a family and member structure. I want to perform analysis (variant and statistical) by family and also by members across families. In native python I could use RegExps to solve it. But, I’m having a hard time trying to find out a good combination of hail functions to be able to do this. Any ideas/suggestions?

Thanks Tim


#19

so like family1_mother or family5_child? Depending on what you’re trying to compute, you can totally do this in Hail!

ht = mt.cols()

split = ht.s.split("_")
ht = ht.annotate(family = split[0],
                 role = split[1])
per_family_ht = ht.group_by('family')\
    .aggregate(mean_pheno_1 = hl.agg.mean(ht.pheno_1))

per_role_ht = ht.group_by('role')\
    .aggregate(mean_pheno_1 = hl.agg.mean(ht.pheno_1))

#20

can you give an example of the analyses you want to do?