[ https://issues.apache.org/jira/browse/SPARK-50908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-50908: --------------------------------- Summary: Fix flacky pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests (was: Fix flay pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests) > Fix flacky > pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-50908 > URL: https://issues.apache.org/jira/browse/SPARK-50908 > Project: Spark > Issue Type: Sub-task > Components: PySpark, Structured Streaming > Affects Versions: 4.0.0 > Reporter: Hyukjin Kwon > Priority: Major > > https://github.com/apache/spark/actions/runs/12883552117/job/35918143550 > {code} > ====================================================================== > ERROR [10.759s]: test_value_state_ttl_expiration > (pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests) > ---------------------------------------------------------------------- > Traceback (most recent call last): > File > "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", > line 422, in test_value_state_ttl_expiration > q.processAllAvailable() > File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, in > processAllAvailable > return self._jsq.processAllAvailable() > File > "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", > line 1362, in __call__ > return_value = get_return_value( > File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line > 253, in deco > raise converted from None > pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] > Query [id = 8e860141-b4c1-445b-be17-7f46f4a8e1dc, runId = > 8b11487b-67b9-4cfb-9176-a421032ff236] terminated with exception: > [FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided > function in foreach batch sink. Reason: An exception was raised by the Python > Proxy. Return Message: Traceback (most recent call last): > File > "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", > line 644, in _call_proxy > return_value = getattr(self.pool[obj_id], method)(*params) > File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call > raise e > File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call > self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) > File > "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", > line 374, in check_results > assertDataFrameEqual( > File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1039, in > assertDataFrameEqual > assert_rows_equal(actual_list, expected_list, maxErrors=maxErrors, > showOnlyDiff=showOnlyDiff) > File "/__w/spark/spark/python/pyspark/testing/utils.py", line 995, in > assert_rows_equal > raise PySparkAssertionError( > pyspark.errors.exceptions.base.PySparkAssertionError: [DIFFERENT_ROWS] > Results do not match: ( 37.50000 % ) > *** actual *** > Row(id='count-0', count=3) > Row(id='count-1', count=3) > ! Row(id='ttl-count-0', count=2) > Row(id='ttl-count-1', count=3) > ! Row(id='ttl-list-state-count-0', count=3) > Row(id='ttl-list-state-count-1', count=7) > ! Row(id='ttl-map-state-count-0', count=2) > Row(id='ttl-map-state-count-1', count=3) > *** expected *** > Row(id='count-0', count=3) > Row(id='count-1', count=3) > ! Row(id='ttl-count-0', count=1) > Row(id='ttl-count-1', count=3) > ! Row(id='ttl-list-state-count-0', count=1) > Row(id='ttl-list-state-count-1', count=7) > ! Row(id='ttl-map-state-count-0', count=1) > Row(id='ttl-map-state-count-1', count=3) > SQLSTATE: 39000 SQLSTATE: XXKST > === Streaming Query === > Identifier: [id = 8e860141-b4c1-445b-be17-7f46f4a8e1dc, runId = > 8b11487b-67b9-4cfb-9176-a421032ff236] > Current Committed Offsets: > {FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka]: > {"logOffset":1}} > Current Available Offsets: > {FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka]: > {"logOffset":2}} > Current State: ACTIVE > Thread State: RUNNABLE > Logical Plan: > ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, > 8e860141-b4c1-445b-be17-7f46f4a8e1dc, Update > +- ~TransformWithStateInPandas transformWithStateUDF(id#15428, > temperature#15429)#15430, 1, [id#15431, count#15432], Update, ProcessingTime, > false, 0 > :- ~Project [id#15428, id#15428, temperature#15429] > : +- ~Project [cast(split_values#15425[0] as string) AS id#15428, > cast(split_values#15425[1] as int) AS temperature#15429] > : +- ~Project [value#15423, split(value#15423, ,, -1) AS > split_values#15425] > : +- ~StreamingExecutionRelation > FileStreamSource[file:/__w/spark/spark/python/target/c80337ee-345b-4c00-b9b5-bfaf49d18854/tmpg_utvcka], > [value#15423] > +- LocalRelation <empty> > JVM stacktrace: > org.apache.spark.sql.streaming.StreamingQueryException: An exception was > raised by the Python Proxy. Return Message: Traceback (most recent call last): > File > "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", > line 644, in _call_proxy > at > org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:105) > at > org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:110) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:875) > at > org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:875) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:393) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at > org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:363) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:343) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:343) > at > org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) > at > org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:343) > at > org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) > ... 1 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org