HeartSaVioR commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1918562610
########## python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py: ########## @@ -1698,6 +1876,173 @@ def init(self, handle: StatefulProcessorHandle) -> None: self.list_state = handle.getListState("listState", key_schema) +class BasicProcessor(StatefulProcessor): + # Schema definitions + state_schema = StructType( + [StructField("id", IntegerType(), True), StructField("name", StringType(), True)] + ) + + def init(self, handle): + + self.state = handle.getValueState("state", self.state_schema) + + def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]: + for pdf in rows: Review Comment: I'd need to check again, but I'd be careful that this could give a different result based on Arrow batch size. Here I assume we expect two input rows per key, so if Arrow batch size = 1, loop will be executed two times. @bogao007 Do we break the inputs of the same key into multiple Arrow batches like we do with applyInPandasWithState? I roughly remember we do, but to double confirm. Anyway, if test is not going to read actual rows (I guess this applies to all tests), you can just do consume without doing nothing, and after that, you can put the code. Could you give a try with the below? ``` for pdf in rows: pass id_val = int(key[0]) name = f"name-{id_val}" self.state.update((id_val, name)) yield pd.DataFrame({"id": [key[0]], "value": [{"id": id_val, "name": name}]}) ``` If the above works, please apply this pattern to all tests. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala: ########## @@ -406,8 +439,12 @@ class TransformWithListStateSuite extends StreamTest testStream(result1, OutputMode.Update())( StartStream(checkpointLocation = dir.getCanonicalPath), - AddData(inputData, "a", "b"), - CheckNewAnswer(("a", "a"), ("b", "b")), + // Write data with initial schema + AddData(inputData, "item1", "item2"), + CheckAnswer(("item1", 1), ("item2", 1)), + // Add more items to verify count increment + AddData(inputData, "item1", "item3"), + CheckAnswer(("item1", 1), ("item2", 1), ("item1", 2), ("item3", 1)), Review Comment: nit: Let's use CheckNewAnswer to verify new output. That'd be easier to reason about. If you intend to explain the rows in state store at this point, probably better to leave code comment, or even verify them with state data source reader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org