HyukjinKwon commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1692329891
########## python/pyspark/sql/pandas/group_ops.py: ########## @@ -358,6 +364,140 @@ def applyInPandasWithState( ) return DataFrame(jdf, self.session) + def transformWithStateInPandas( + self, + statefulProcessor: StatefulProcessor, + outputStructType: Union[StructType, str], + outputMode: str, + timeMode: str, + ) -> DataFrame: + """ + Invokes methods defined in the stateful processor used in arbitrary state API v2. + We allow the user to act on per-group set of input rows along with keyed state and the + user can choose to output/return 0 or more rows. + + For a streaming dataframe, we will repeatedly invoke the interface methods for new rows + in each trigger and the user's state/state variables will be stored persistently across + invocations. + + The `statefulProcessor` should be a Python class that implements the interface defined in + pyspark.sql.streaming.stateful_processor.StatefulProcessor. + + The `outputStructType` should be a :class:`StructType` describing the schema of all + elements in the returned value, `pandas.DataFrame`. The column labels of all elements in + returned `pandas.DataFrame` must either match the field names in the defined schema if + specified as strings, or match the field data types by position if not strings, + e.g. integer indices. + + The size of each `pandas.DataFrame` in both the input and output can be arbitrary. The + number of `pandas.DataFrame` in both the input and output can also be arbitrary. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + statefulProcessor : :class:`pyspark.sql.streaming.stateful_processor.StatefulProcessor` + Instance of StatefulProcessor whose functions will be invoked by the operator. + outputStructType : :class:`pyspark.sql.types.DataType` or str + The type of the output records. The value can be either a + :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. + outputMode : str + The output mode of the stateful processor. + timeMode : str + The time mode semantics of the stateful processor for timers and TTL. + + Examples + -------- + >>> import pandas as pd + >>> from pyspark.sql import Row + >>> from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle + >>> from pyspark.sql.types import StructType, StructField, LongType, StringType, IntegerType + >>> from typing import Iterator + >>> from pyspark.sql.functions import split, col Review Comment: let's fix the import according to PEP 8 import order -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org