anishshri-db commented on code in PR #54373:
URL: https://github.com/apache/spark/pull/54373#discussion_r2849134593
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/StreamExecution.scala:
##########
@@ -337,6 +338,33 @@ abstract class StreamExecution(
}
getLatestExecutionContext().updateStatusMessage("Initializing sources")
+
+ // Restore SQL configs from checkpoint before evaluating logicalPlan.
+ // For existing queries, checkpoint config values take precedence over
session config.
+ // This ensures stable behavior when resuming a query, as the
checkpoint contains
+ // the configuration that was used when the query was first created.
+ // Session config values apply only to new queries being started.
+ offsetLog.getLatest().foreach { case (_, offsetSeq) =>
+ offsetSeq.metadataOpt.foreach { metadata =>
+ val checkpointEnforcement =
+ OffsetSeqMetadata.readValueOpt(
+ metadata,
SQLConf.ENABLE_STREAMING_SOURCE_EVOLUTION).getOrElse("false")
+ val sessionEnforcement =
+
sparkSessionForStream.sessionState.conf.enableStreamingSourceEvolution.toString
+
+ // Log a warning if session config differs from checkpoint config
+ if (sessionEnforcement != checkpointEnforcement) {
Review Comment:
nit: we can remove this - this is implied. if you really want to keep it,
lets make this `INFO`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]