LuciferYang commented on PR #50686:
URL: https://github.com/apache/spark/pull/50686#issuecomment-2837303464

   @bogao007 I've noticed that after this pr  was merged, one of the PySpark 
test cases in the `branch-4.0` started failing. Do you have the time to help 
check whether this failure was caused by this pr? Thanks ~
   
   
![image](https://github.com/user-attachments/assets/d10df637-71bc-4894-a954-73291867aff0)
   
   
   - https://github.com/apache/spark/actions/runs/14634354144/job/41062389133
   - https://github.com/apache/spark/actions/runs/14720999763/job/41314744371
   
   ```======================================================================
   ERROR [3.032s]: test_transform_with_map_state_metadata 
(pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests.test_transform_with_map_state_metadata)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 1092, in test_transform_with_map_state_metadata
       self._test_transform_with_state_in_pandas_basic(
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 167, in _test_transform_with_state_in_pandas_basic
       q.processAllAvailable()
     File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, 
in processAllAvailable
       return self._jsq.processAllAvailable()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 
1362, in __call__
       return_value = get_return_value(
                      ^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 
258, in deco
       raise converted from None
   pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] 
Query [id = 93bca03a-a4d4-449e-b821-d6e965cb1568, runId = 
e807cd45-221f-4c42-82a0-7122ced903e0] terminated with exception: 
[FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided 
function in foreach batch sink. Reason: An exception was raised by the Python 
Proxy. Return Message: Traceback (most recent call last):
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 
644, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call
       raise e
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 1043, in check_results
       assert map_state_df.selectExpr(
              ^^^^^^^^^^^^^^^^^^^^^^^^
   AssertionError
    SQLSTATE: 39000 SQLSTATE: XXKST
   === Streaming Query ===
   Identifier: this_query [id = 93bca03a-a4d4-449e-b821-d6e965cb1568, runId = 
e807cd45-221f-4c42-82a0-7122ced903e0]
   Current Committed Offsets: 
{FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]:
 {"logOffset":0}}
   Current Available Offsets: 
{FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]:
 {"logOffset":0}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, 
93bca03a-a4d4-449e-b821-d6e965cb1568, [queryName=this_query, 
checkpointLocation=/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmp2qsihe_d],
 Update
   +- ~TransformWithStateInPandas transformWithStateUDF(id#2296, 
temperature#2297)#2298, 1, [id#2299, countAsString#2300], Update, 
ProcessingTime, false, 0
      :- ~Project [id#2296, id#2296, temperature#2297]
      :  +- ~Project [cast(split_values#2293[0] as string) AS id#2296, 
cast(split_values#2293[1] as int) AS temperature#2297]
      :     +- ~Project [value#2291, split(value#2291, ,, -1) AS 
split_values#2293]
      :        +- ~StreamingExecutionRelation 
FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl],
 [value#2291]
      +- LocalRelation <empty>
   
   
   JVM stacktrace:
   org.apache.spark.sql.streaming.StreamingQueryException: An exception was 
raised by the Python Proxy. Return Message: Traceback (most recent call last):
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 
644, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call
       raise e
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 1043, in check_results
       assert map_state_df.selectExpr(
              ^^^^^^^^^^^^^^^^^^^^^^^^
   AssertionError
   
   === Streaming Query ===
   Identifier: this_query [id = 93bca03a-a4d4-449e-b821-d6e965cb1568, runId = 
e807cd45-221f-4c42-82a0-7122ced903e0]
   Current Committed Offsets: 
{FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]:
 {"logOffset":0}}
   Current Available Offsets: 
{FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl]:
 {"logOffset":0}}
   
   Current State: ACTIVE
   Thread State: RUNNABLE
   
   Logical Plan:
   ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, 
93bca03a-a4d4-449e-b821-d6e965cb1568, [queryName=this_query, 
checkpointLocation=/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmp2qsihe_d],
 Update
   +- ~TransformWithStateInPandas transformWithStateUDF(id#2296, 
temperature#2297)#2298, 1, [id#2299, countAsString#2300], Update, 
ProcessingTime, false, 0
      :- ~Project [id#2296, id#2296, temperature#2297]
      :  +- ~Project [cast(split_values#2293[0] as string) AS id#2296, 
cast(split_values#2293[1] as int) AS temperature#2297]
      :     +- ~Project [value#2291, split(value#2291, ,, -1) AS 
split_values#2293]
      :        +- ~StreamingExecutionRelation 
FileStreamSource[file:/__w/spark/spark/python/target/a3c26663-6df8-412f-89e4-fb263c45e03e/tmpnt2874xl],
 [value#2291]
      +- LocalRelation <empty>
   
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:372)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
   Caused by: py4j.Py4JException: An exception was raised by the Python Proxy. 
Return Message: Traceback (most recent call last):
     File 
"/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 
644, in _call_proxy
       return_value = getattr(self.pool[obj_id], method)(*params)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call
       raise e
     File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call
       self.func(DataFrame(jdf, wrapped_session_jdf), batch_id)
     File 
"/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py",
 line 1043, in check_results
       assert map_state_df.selectExpr(
              ^^^^^^^^^^^^^^^^^^^^^^^^
   AssertionError
   
        at py4j.Protocol.getReturnValue(Protocol.java:476)
        at 
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
        at jdk.proxy3/jdk.proxy3.$Proxy45.call(Unknown Source)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:87)
        at 
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:87)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56)
        at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:879)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
        at 
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
        at 
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106)
        at 
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:876)
        at 
org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:876)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:394)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at 
org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:364)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:344)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:344)
        at 
org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39)
        at 
org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:344)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311)
        ... 1 more
   ```
   
   also cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to