attilapiros commented on PR #50550: URL: https://github.com/apache/spark/pull/50550#issuecomment-2807650181
@HeartSaVioR, @anishshri-db Could this cause the following failure in the branch-4.0? ``` ====================================================================== ERROR [27.008s]: test_transform_with_state_with_wmark_and_non_event_time (pyspark.sql.tests.pandas.test_pandas_transform_with_state.TransformWithStateInPandasTests.test_transform_with_state_with_wmark_and_non_event_time) ---------------------------------------------------------------------- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 654, in test_transform_with_state_with_wmark_and_non_event_time self._test_transform_with_state_in_pandas_event_time( File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 594, in _test_transform_with_state_in_pandas_event_time q.processAllAvailable() File "/__w/spark/spark/python/pyspark/sql/streaming/query.py", line 351, in processAllAvailable return self._jsq.processAllAvailable() ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/java_gateway.py", line 1362, in __call__ return_value = get_return_value( ^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/errors/exceptions/captured.py", line 258, in deco raise converted from None pyspark.errors.exceptions.captured.StreamingQueryException: [STREAM_FAILED] Query [id = d597891d-f801-4437-8fee-ce409a254005, runId = f5e21d21-eaea-404d-84c5-fab9aea96fa5] terminated with exception: [FOREACH_BATCH_USER_FUNCTION_ERROR] An error occurred in the user provided function in foreach batch sink. Reason: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last): File "/__w/spark/spark/python/lib/py4j-0.10.9.9-src.zip/py4j/clientserver.py", line 644, in _call_proxy return_value = getattr(self.pool[obj_id], method)(*params) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/__w/spark/spark/python/pyspark/sql/utils.py", line 165, in call raise e File "/__w/spark/spark/python/pyspark/sql/utils.py", line 162, in call self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 646, in check_results assert set(batch_df.sort("id").collect()) == { ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AssertionError SQLSTATE: 39000 SQLSTATE: XXKST === Streaming Query === Identifier: event_time_test_query [id = d597891d-f801-4437-8fee-ce409a254005, runId = f5e21d21-eaea-404d-84c5-fab9aea96fa5] Current Committed Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/ac4c6aca-8848-48cf-aaf2-8803f0578fb6/tmpltmsgq6q]: {"logOffset":2}} Current Available Offsets: {FileStreamSource[file:/__w/spark/spark/python/target/ac4c6aca-8848-48cf-aaf2-8803f0578fb6/tmpltmsgq6q]: {"logOffset":2}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: ~WriteToMicroBatchDataSourceV1 ForeachBatchSink, d597891d-f801-4437-8fee-ce409a254005, [queryName=event_time_test_query], Update +- ~TransformWithStateInPandas transformWithStateUDF(id#15021, eventTime#15024-T10000ms)#15025, 1, [id#15026, timestamp#15027], Update, ProcessingTime, false, 0 :- ~Project [id#15021, id#15021, eventTime#15024-T10000ms] self.func(DataFrame(jdf, wrapped_session_jdf), batch_id) File "/__w/spark/spark/python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py", line 646, in check_results assert set(batch_df.sort("id").collect()) == { ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AssertionError at py4j.Protocol.getReturnValue(Protocol.java:476) at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at jdk.proxy3/jdk.proxy3.$Proxy45.call(Unknown Source) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:87) at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:87) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56) at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:879) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162) at org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112) at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:106) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:124) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:291) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:123) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:233) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:876) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:876) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:394) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:186) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:364) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39) at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:70) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:82) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311) ... 1 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org