Please send a PR. Thanks for looking at this. On Thu, Nov 16, 2017 at 7:27 AM Andrew Andrade <and...@andrewandrade.ca> wrote:
> Hello devs, > > I know a lot of great work has been done recently with pandas to spark > dataframes and vice versa using Apache Arrow, but I faced a specific pain > point on a low memory setup without Arrow. > > Specifically I was finding a driver OOM running a toPandas on a small > dataset (<100 MB compressed). There was discussion about toPandas being > slow > <http://apache-spark-developers-list.1001551.n3.nabble.com/toPandas-very-slow-td16794.html> > in March 2016 due to a self.collect(). A solution was found to create Pandas > DataFrames or Numpy Arrays using MapPartitions for each partition > <https://gist.github.com/joshlk/871d58e01417478176e7>, but it was never > implemented back into dataframe.py > > I understand that using Apache arrow will solve this, but in a setup > without Arrow (like the one where I faced the painpoint), I investigated > memory usage of toPanda and to_pandas (dataframe per partition) and played > with the number of partitions. The findings are here > <https://gist.github.com/mrandrewandrade/7f5ff26c5275376d3cd5e427ca74d50f> > . > > The summary of the findings are that on a 147MB dataset, toPandas memory > usage was about 784MB while while doing it partition by partition (with 100 > partitions) had an overhead of 76.30 MM and took almost half of the time to > run. I realize that Arrow solves this but the modification is quite small > and would greatly assist anyone who isn't able to use Arrow. > > Would a PR [1] from me to address this issue be welcome? > > Thanks, > > Andrew > > [1] From Josh's Gist > > def _map_to_pandas(rdds): > """ Needs to be here due to pickling issues """ > return [pd.DataFrame(list(rdds))] > > def toPandas(df, n_partitions=None): > """ > Returns the contents of `df` as a local `pandas.DataFrame` in a speedy > fashion. The DataFrame is > repartitioned if `n_partitions` is passed. > :param df: pyspark.sql.DataFrame > :param n_partitions: int or None > :return: pandas.DataFrame > """ > if n_partitions is not None: df = df.repartition(n_partitions) > df_pand = df.rdd.mapPartitions(_map_to_pandas).collect() > df_pand = pd.concat(df_pand) > df_pand.columns = df.columns > return df_pand >