LuciferYang commented on PR #50686: URL: https://github.com/apache/spark/pull/50686#issuecomment-2837303464
@bogao007 I've noticed that after this pr was merged, one of the PySpark test cases in the `branch-4.0` started failing. Do you have the time to help check whether this failure was caused by this pr? Thanks ~  - https://github.com/apache/spark/actions/runs/14634354144/job/41062389133 - https://github.com/apache/spark/actions/runs/14720999763/job/41314744371 ```====================================================================== ERROR [3.032s]: test_transform_with_map_state_metadata (pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests.test_transform_with_map_state_metadata) ---------------------------------------------------------------------- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 1092, in test_transform_with_map_state_metadata self._test_transform_with_state_in_pandas_basic( File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 167, in _test_transform_with_state_in_pandas_basic q.processAllAvailable() File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, in processAllAvailable return self._jsq.processAllAvailable() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 258, in deco raise converted from None pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 93bca03a-a4d4-449e-b821-d6e965cb1568, runId = e807cd45-221f-4c42-82a0-7122ced903e0] terminated with exception: [FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided function in foreach batch sink. Reason: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 1043, in check_results assert map_state_df.selectExpr( ^^^^^^^^^^^^^^^^^^^^^^^^ AssertionError SQLSTATE: 39000 SQLSTATE: XXKST === Streaming Query === Identifier: this_query [id = 93bca03a-a4d4-449e-b821-d6e965cb1568, runId = e807cd45-221f-4c42-82a0-7122ced903e0] Current Committed Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]: {"logOffset":0}} Current Available Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]: {"logOffset":0}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, 93bca03a-a4d4-449e-b821-d6e965cb1568, [queryName=this_query, checkpointLocation=/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmp2qsihe_d], Update +- ~TransformWithStateInPandas transformWithStateUDF(id#2296, temperature#2297)#2298, 1, [id#2299, countAsString#2300], Update, ProcessingTime, false, 0 :- ~Project [id#2296, id#2296, temperature#2297] : +- ~Project [cast(split_values#2293[0] as string) AS id#2296, cast(split_values#2293[1] as int) AS temperature#2297] : +- ~Project [value#2291, split(value#2291, ,, -1) AS split_values#2293] : +- ~StreamingExecutionRelation FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl], [value#2291] +- LocalRelation <empty> JVM stacktrace: org.apache.spark.sql.streaming.StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 1043, in check_results assert map_state_df.selectExpr( ^^^^^^^^^^^^^^^^^^^^^^^^ AssertionError === Streaming Query === Identifier: this_query [id = 93bca03a-a4d4-449e-b821-d6e965cb1568, runId = e807cd45-221f-4c42-82a0-7122ced903e0] Current Committed Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]: {"logOffset":0}} Current Available Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]: {"logOffset":0}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, 93bca03a-a4d4-449e-b821-d6e965cb1568, [queryName=this_query, checkpointLocation=/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmp2qsihe_d], Update +- ~TransformWithStateInPandas transformWithStateUDF(id#2296, temperature#2297)#2298, 1, [id#2299, countAsString#2300], Update, ProcessingTime, false, 0 :- ~Project [id#2296, id#2296, temperature#2297] : +- ~Project [cast(split_values#2293[0] as string) AS id#2296, cast(split_values#2293[1] as int) AS temperature#2297] : +- ~Project [value#2291, split(value#2291, ,, -1) AS split_values#2293] : +- ~StreamingExecutionRelation FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl], [value#2291] +- LocalRelation <empty> at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:372) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226) Caused by: py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 1043, in check_results assert map_state_df.selectExpr( ^^^^^^^^^^^^^^^^^^^^^^^^ AssertionError at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at jdk.proxy3/jdk.proxy3.$Proxy45.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:87) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:87) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:879) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162) at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:876) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:876) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:394) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:364) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) ... 1 more ``` also cc @HeartSaVioR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org