HeartSaVioR commented on code in PR #50600:
URL: https://github.com/apache/spark/pull/50600#discussion_r2048041661


##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1369,8 +1374,212 @@ def flatten_columns(cur_batch, col_name):
         data_batches = generate_data_batches(_batches)
 
         for k, g in groupby(data_batches, key=lambda x: x[0]):
-            yield (TransformWithStateInPandasFuncMode.PROCESS_DATA, k, g)
+            yield (TransformWithStateInPySparkFuncMode.PROCESS_DATA, k, g)
+
+        yield (TransformWithStateInPySparkFuncMode.PROCESS_TIMER, None, None)
+
+        yield (TransformWithStateInPySparkFuncMode.COMPLETE, None, None)
+
+
+class TransformWithStateInPySparkRowSerializer(ArrowStreamUDFSerializer):
+    """
+    Serializer used by Python worker to evaluate UDF for
+    :meth:`pyspark.sql.GroupedData.transformWithState`.
+
+    Parameters
+    ----------
+    arrow_max_records_per_batch : int
+        Limit of the number of records that can be written to a single 
ArrowRecordBatch in memory.
+    """
+
+    def __init__(self, arrow_max_records_per_batch):
+        super(TransformWithStateInPySparkRowSerializer, self).__init__()
+        self.arrow_max_records_per_batch = arrow_max_records_per_batch
+        self.key_offsets = None
+
+    def load_stream(self, stream):
+        """
+        Read ArrowRecordBatches from stream, deserialize them to populate a 
list of data chunk, and
+        convert the data into a list of pandas.Series.

Review Comment:
   Yeah, nice finding!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to