HeartSaVioR commented on code in PR #50718:
URL: https://github.com/apache/spark/pull/50718#discussion_r2061911264


##########
python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py:
##########
@@ -227,6 +229,14 @@ def row(self):
         return RowMinEventTimeStatefulProcessor()
 
 
+class StatefulProcessorCompositeTypeFactory(StatefulProcessorFactory):
+    def row(self):
+        return RowStatefulProcessorCompositeType()
+
+    def pandas(self):
+        return StatefulProcessorCompositeType()

Review Comment:
   nit: Please add `Pandas` as prefix of the class, and switch the method order 
as convention.



##########
python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py:
##########
@@ -1613,3 +1623,191 @@ def handleInputRows(self, key, rows, timerValues) -> 
Iterator[Row]:
 
     def close(self) -> None:
         pass
+
+
+# A stateful processor that contains composite python type inside Value, List 
and Map state variable
+class StatefulProcessorCompositeType(StatefulProcessor):
+    def init(self, handle: StatefulProcessorHandle) -> None:
+        super().init(handle)

Review Comment:
   nit: Is it necessary?



##########
python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py:
##########
@@ -1613,3 +1623,191 @@ def handleInputRows(self, key, rows, timerValues) -> 
Iterator[Row]:
 
     def close(self) -> None:
         pass
+
+
+# A stateful processor that contains composite python type inside Value, List 
and Map state variable
+class StatefulProcessorCompositeType(StatefulProcessor):
+    def init(self, handle: StatefulProcessorHandle) -> None:
+        super().init(handle)
+        array_schema = StructType([
+            StructField("id", ArrayType(IntegerType())),  # Array of primitive
+            StructField("tags", ArrayType(ArrayType(StringType()))),  # Array 
of Array
+            StructField("metadata", ArrayType(  # Array of struct
+                StructType([
+                    StructField("key", StringType()),
+                    StructField("value", StringType())
+                ])
+            ))
+        ])
+        map_schema = StructType([
+            StructField("id", IntegerType(), True),
+            # Map of String -> Array of Int
+            StructField("attributes", MapType(StringType(), 
ArrayType(IntegerType())), True)
+        ])
+
+        self.array_state = handle.getValueState("array_state", array_schema)
+        self.list_state = handle.getListState("list_state", array_schema)
+        self.map_state = handle.getMapState("map_state", "name string", 
map_schema)
+
+    def handleInputRows(self, key, rows, timerValues) -> 
Iterator[pd.DataFrame]:
+        accumulated_value = 0
+        tag_field = [["dummy1", "dummy2"], ["dummy3"]]
+        metadata_field = [{"key": "env", "value": "prod"}, {"key": "region", 
"value": "us-west"}]
+
+        for pdf in rows:
+            print(f"input rows type: {type(rows)}")

Review Comment:
   nit: let's remove the log for debug



##########
python/pyspark/sql/tests/pandas/helper/helper_pandas_transform_with_state.py:
##########
@@ -1613,3 +1623,191 @@ def handleInputRows(self, key, rows, timerValues) -> 
Iterator[Row]:
 
     def close(self) -> None:
         pass
+
+
+# A stateful processor that contains composite python type inside Value, List 
and Map state variable
+class StatefulProcessorCompositeType(StatefulProcessor):
+    def init(self, handle: StatefulProcessorHandle) -> None:
+        super().init(handle)
+        array_schema = StructType([
+            StructField("id", ArrayType(IntegerType())),  # Array of primitive
+            StructField("tags", ArrayType(ArrayType(StringType()))),  # Array 
of Array
+            StructField("metadata", ArrayType(  # Array of struct
+                StructType([
+                    StructField("key", StringType()),
+                    StructField("value", StringType())
+                ])
+            ))
+        ])
+        map_schema = StructType([
+            StructField("id", IntegerType(), True),
+            # Map of String -> Array of Int
+            StructField("attributes", MapType(StringType(), 
ArrayType(IntegerType())), True)

Review Comment:
   Shall we add nested map as well while we are here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to