Hyukjin Kwon created SPARK-50908: ------------------------------------ Summary: Fix flay pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests Key: SPARK-50908 URL: https://issues.apache.org/jira/browse/SPARK-50908 Project: Spark Issue Type: Sub-task Components: PySpark, Structured Streaming Affects Versions: 4.0.0 Reporter: Hyukjin Kwon
https://github.com/apache/spark/actions/runs/12883552117/job/35918143550 {code} ====================================================================== ERROR [10.759s]: test_value_state_ttl_expiration (pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests) ---------------------------------------------------------------------- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 422, in test_value_state_ttl_expiration q.processAllAvailable() File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, in processAllAvailable return self._jsq.processAllAvailable() File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ return_value = get_return_value( File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 253, in deco raise converted from None pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = 8e860141-b4c1-445b-be17-7f46f4a8e1dc, runId = 8b11487b-67b9-4cfb-9176-a421032ff236] terminated with exception: [FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided function in foreach batch sink. Reason: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 374, in check_results assertDataFrameEqual( File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1039, in assertDataFrameEqual assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors, showOnlyDiff=showOnlyDiff) File "/__w/spark/spark/python/pyspark/testing/utils.py", line 995, in assert_rows_equal raise PySparkAssertionError( pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_ROWS] Results do not match: ( 37.50000 % ) *** actual *** Row(id='count-0', count=3) Row(id='count-1', count=3) ! Row(id='ttl-count-0', count=2) Row(id='ttl-count-1', count=3) ! Row(id='ttl-list-state-count-0', count=3) Row(id='ttl-list-state-count-1', count=7) ! Row(id='ttl-map-state-count-0', count=2) Row(id='ttl-map-state-count-1', count=3) *** expected *** Row(id='count-0', count=3) Row(id='count-1', count=3) ! Row(id='ttl-count-0', count=1) Row(id='ttl-count-1', count=3) ! Row(id='ttl-list-state-count-0', count=1) Row(id='ttl-list-state-count-1', count=7) ! Row(id='ttl-map-state-count-0', count=1) Row(id='ttl-map-state-count-1', count=3) SQLSTATE: 39000 SQLSTATE: XXKST === Streaming Query === Identifier: [id = 8e860141-b4c1-445b-be17-7f46f4a8e1dc, runId = 8b11487b-67b9-4cfb-9176-a421032ff236] Current Committed Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka]: {"logOffset":1}} Current Available Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka]: {"logOffset":2}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, 8e860141-b4c1-445b-be17-7f46f4a8e1dc, Update +- ~TransformWithStateInPandas transformWithStateUDF(id#15428, temperature#15429)#15430, 1, [id#15431, count#15432], Update, ProcessingTime, false, 0 :- ~Project [id#15428, id#15428, temperature#15429] : +- ~Project [cast(split_values#15425[0] as string) AS id#15428, cast(split_values#15425[1] as int) AS temperature#15429] : +- ~Project [value#15423, split(value#15423, ,, -1) AS split_values#15425] : +- ~StreamingExecutionRelation FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka], [value#15423] +- LocalRelation <empty> JVM stacktrace: org.apache.spark.sql.streaming.StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:105) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:110) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:875) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:875) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:393) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:363) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:343) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:343) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:343) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) ... 1 more {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org