HeartSaVioR commented on code in PR #50718: URL: https://github.com/apache/spark/pull/50718#discussion_r2061911264
########## python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py: ########## @@ -227,6 +229,14 @@ def row(self): return RowMinEventTimeStatefulProcessor() +class StatefulProcessorCompositeTypeFactory(StatefulProcessorFactory): + def row(self): + return RowStatefulProcessorCompositeType() + + def pandas(self): + return StatefulProcessorCompositeType() Review Comment: nit: Please add `Pandas` as prefix of the class, and switch the method order as convention. ########## python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py: ########## @@ -1613,3 +1623,191 @@ def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]: def close(self) -> None: pass + + +# A stateful processor that contains composite python type inside Value, List and Map state variable +class StatefulProcessorCompositeType(StatefulProcessor): + def init(self, handle: StatefulProcessorHandle) -> None: + super().init(handle) Review Comment: nit: Is it necessary? ########## python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py: ########## @@ -1613,3 +1623,191 @@ def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]: def close(self) -> None: pass + + +# A stateful processor that contains composite python type inside Value, List and Map state variable +class StatefulProcessorCompositeType(StatefulProcessor): + def init(self, handle: StatefulProcessorHandle) -> None: + super().init(handle) + array_schema = StructType([ + StructField("id", ArrayType(IntegerType())), # Array of primitive + StructField("tags", ArrayType(ArrayType(StringType()))), # Array of Array + StructField("metadata", ArrayType( # Array of struct + StructType([ + StructField("key", StringType()), + StructField("value", StringType()) + ]) + )) + ]) + map_schema = StructType([ + StructField("id", IntegerType(), True), + # Map of String -> Array of Int + StructField("attributes", MapType(StringType(), ArrayType(IntegerType())), True) + ]) + + self.array_state = handle.getValueState("array_state", array_schema) + self.list_state = handle.getListState("list_state", array_schema) + self.map_state = handle.getMapState("map_state", "name string", map_schema) + + def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]: + accumulated_value = 0 + tag_field = [["dummy1", "dummy2"], ["dummy3"]] + metadata_field = [{"key": "env", "value": "prod"}, {"key": "region", "value": "us-west"}] + + for pdf in rows: + print(f"input rows type: {type(rows)}") Review Comment: nit: let's remove the log for debug ########## python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py: ########## @@ -1613,3 +1623,191 @@ def handleInputRows(self, key, rows, timerValues) -> Iterator[Row]: def close(self) -> None: pass + + +# A stateful processor that contains composite python type inside Value, List and Map state variable +class StatefulProcessorCompositeType(StatefulProcessor): + def init(self, handle: StatefulProcessorHandle) -> None: + super().init(handle) + array_schema = StructType([ + StructField("id", ArrayType(IntegerType())), # Array of primitive + StructField("tags", ArrayType(ArrayType(StringType()))), # Array of Array + StructField("metadata", ArrayType( # Array of struct + StructType([ + StructField("key", StringType()), + StructField("value", StringType()) + ]) + )) + ]) + map_schema = StructType([ + StructField("id", IntegerType(), True), + # Map of String -> Array of Int + StructField("attributes", MapType(StringType(), ArrayType(IntegerType())), True) Review Comment: Shall we add nested map as well while we are here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org