HeartSaVioR commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1918562610


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -1698,6 +1876,173 @@ def init(self, handle: StatefulProcessorHandle) -> None:
         self.list_state = handle.getListState("listState", key_schema)
 
 
+class BasicProcessor(StatefulProcessor):
+    # Schema definitions
+    state_schema = StructType(
+        [StructField("id", IntegerType(), True), StructField("name", 
StringType(), True)]
+    )
+
+    def init(self, handle):
+
+        self.state = handle.getValueState("state", self.state_schema)
+
+    def handleInputRows(self, key, rows, timer_values) -> 
Iterator[pd.DataFrame]:
+        for pdf in rows:

Review Comment:
   I'd need to check again, but I'd be careful that this could give a different 
result based on Arrow batch size. Here I assume we expect two input rows per 
key, so if Arrow batch size = 1, loop will be executed two times.
   
   @bogao007 Do we break the inputs of the same key into multiple Arrow batches 
like we do with applyInPandasWithState? I roughly remember we do, but to double 
confirm.
   
   Anyway, if test is not going to read actual rows (I guess this applies to 
all tests), you can just do consume without doing nothing, and after that, you 
can put the code. Could you give a try with the below?
   
   ```
   for pdf in rows:
     pass
   
   id_val = int(key[0])
   name = f"name-{id_val}"
   self.state.update((id_val, name))
   yield pd.DataFrame({"id": [key[0]], "value": [{"id": id_val, "name": name}]})
   ```
   
   If the above works, please apply this pattern to all tests.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala:
##########
@@ -406,8 +439,12 @@ class TransformWithListStateSuite extends StreamTest
 
         testStream(result1, OutputMode.Update())(
           StartStream(checkpointLocation = dir.getCanonicalPath),
-          AddData(inputData, "a", "b"),
-          CheckNewAnswer(("a", "a"), ("b", "b")),
+          // Write data with initial schema
+          AddData(inputData, "item1", "item2"),
+          CheckAnswer(("item1", 1), ("item2", 1)),
+          // Add more items to verify count increment
+          AddData(inputData, "item1", "item3"),
+          CheckAnswer(("item1", 1), ("item2", 1), ("item1", 2), ("item3", 1)),

Review Comment:
   nit: Let's use CheckNewAnswer to verify new output. That'd be easier to 
reason about.
   
   If you intend to explain the rows in state store at this point, probably 
better to leave code comment, or even verify them with state data source reader.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to