xin-aurora commented on code in PR #50552:
URL: https://github.com/apache/spark/pull/50552#discussion_r2059321248


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowFunctionFrame.scala:
##########
@@ -180,10 +180,20 @@ abstract class OffsetWindowFunctionFrameBase(
     }
   }
 
+  /** Indicates whether the default values are Literal values. */
+  protected lazy val onlyLiteralNulls = expressions.forall { e =>
+    e.default == null || (e.default.foldable && e.default.eval() == null)

Review Comment:
   @cloud-fan I changed the code to `e.default == null || e.default.foldable` 
and updated the repository to the latest branch. The 
`KafkaMicroBatchV2SourceWithConsumerSuite` test no longer produces the error.
   
   I am not sure about which specific foldable expression caused the test 
failed. The previous error was related to `Query with Trigger.AvailableNow`. 
Here are part of the error messages: 
   ```
   [info] - Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable during subsequent batches *** FAILED *** (1 minute)
   [info]   java.lang.AssertionError: assertion failed: Exception tree doesn't 
contain the expected exception with message: Some of partitions in Kafka 
topic(s) have been lost during running query with Trigger.AvailableNow.
   [info] org.scalatest.exceptions.TestFailedException: isPropagated was false 
Partition [topic-40, 1] metadata not propagated after timeout
   ...
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.waitUntilMetadataIsPropagated(KafkaTestUtils.scala:614)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$createTopic$1(KafkaTestUtils.scala:379)
   [info]       at 
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:192)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaTestUtils.createTopic(KafkaTestUtils.scala:378)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11(KafkaMicroBatchSourceSuite.scala:351)
   [info]       at 
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$11$adapted(KafkaMicroBatchSourceSuite.scala:348)
   [info]       at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.callBatchWriter(ForeachBatchSink.scala:56)
   [info]       at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:49)
   [info]       at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:880)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$8(SQLExecution.scala:162)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.withSessionTagsApplied(SQLExecution.scala:268)
   [info]       at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$7(SQLExecution.scala:124)
   ...
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to