[ https://issues.apache.org/jira/browse/SPARK-36600?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-36600: ----------------------------------- Labels: easyfix pull-request-available (was: easyfix) > Optimise speed and memory with Pyspark when create DataFrame (with patch) > ------------------------------------------------------------------------- > > Key: SPARK-36600 > URL: https://issues.apache.org/jira/browse/SPARK-36600 > Project: Spark > Issue Type: Improvement > Components: PySpark > Affects Versions: 3.1.2 > Reporter: Philippe Prados > Priority: Trivial > Labels: easyfix, pull-request-available > Attachments: optimize_memory_pyspark.patch > > Original Estimate: 2h > Remaining Estimate: 2h > > The Python method {{SparkSession._createFromLocal()}} start to the data, and > create a list if it's not an instance of list. But it is necessary only if > the scheme is not present. > {quote}# make sure data could consumed multiple times > if not isinstance(data, list): > data = list(data) > {quote} > If you use {{createDataFrame(data=_a_generator_,...)}}, all the datas were > save in memory in a list, then convert to a row in memory, then convert to > buffer in pickle format, etc. > Two lists were present at the same time in memory. The list created by > _createFromLocal() and the list created later with > {quote}# convert python objects to sql data > data = [schema.toInternal(row) for row in data] > {quote} > The purpose of using a generator is to reduce the memory footprint when the > data are dynamically build. > {quote}def _createFromLocal(self, data, schema): > """ > Create an RDD for DataFrame from a list or pandas.DataFrame, returns > the RDD and schema. > """ > if schema is None or isinstance(schema, (list, tuple)): > *# make sure data could consumed multiple times* > *if inspect.isgeneratorfunction(data):* > *data = list(data)* > struct = self._inferSchemaFromList(data, names=schema) > converter = _create_converter(struct) > data = map(converter, data) > if isinstance(schema, (list, tuple)): > for i, name in enumerate(schema): > struct.fields[i].name = name > struct.names[i] = name > schema = struct > elif not isinstance(schema, StructType): > raise TypeError("schema should be StructType or list or None, but got: > %s" % schema) > # convert python objects to sql data > data = [schema.toInternal(row) for row in data] > return self._sc.parallelize(data), schema{quote} > Then, it is interesting to use a generator. > > {quote}The patch: > diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py > index 57c680fd04..0dba590451 100644 > --- a/python/pyspark/sql/session.py > +++ b/python/pyspark/sql/session.py > @@ -15,6 +15,7 @@ > # limitations under the License. > # > > +import inspect > import sys > import warnings > from functools import reduce > @@ -504,11 +505,11 @@ class SparkSession(SparkConversionMixin): > Create an RDD for DataFrame from a list or pandas.DataFrame, returns > the RDD and schema. > """ > - # make sure data could consumed multiple times > - if not isinstance(data, list): > - data = list(data) > > if schema is None or isinstance(schema, (list, tuple)): > + # make sure data could consumed multiple times > + if inspect.isgeneratorfunction(data): # PPR > + data = list(data) > struct = self._inferSchemaFromList(data, names=schema) > converter = _create_converter(struct) > data = map(converter, data) > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org