anishshri-db commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1902148602


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -163,25 +157,47 @@ class StateSchemaCompatibilityChecker(
   private def check(
       oldSchema: StateStoreColFamilySchema,
       newSchema: StateStoreColFamilySchema,
-      ignoreValueSchema: Boolean) : Unit = {
+      ignoreValueSchema: Boolean,
+      schemaEvolutionEnabled: Boolean): StateStoreColFamilySchema = {
+
+    def incrementSchemaId(id: Short): Short = (id + 1).toShort
+
+    // Initialize with old schema IDs
+    var resultSchema = newSchema.copy(
+      keySchemaId = oldSchema.keySchemaId,
+      valueSchemaId = oldSchema.valueSchemaId
+    )
     val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema,
       oldSchema.valueSchema)
     val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema)
 
     if (storedKeySchema.equals(keySchema) &&
       (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
       // schema is exactly same
+      oldSchema
     } else if (!schemasCompatible(storedKeySchema, keySchema)) {
       throw 
StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
         keySchema.toString)
+    } else if (!ignoreValueSchema && schemaEvolutionEnabled) {
+      // Check value schema evolution
+      val oldAvroSchema = 
SchemaConverters.toAvroTypeWithDefaults(storedValueSchema)
+      val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema)
+
+      val validator = new 
SchemaValidatorBuilder().canReadStrategy.validateAll()

Review Comment:
   What happens if new schema is not compatible with old for evolution ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -163,25 +157,47 @@ class StateSchemaCompatibilityChecker(
   private def check(
       oldSchema: StateStoreColFamilySchema,
       newSchema: StateStoreColFamilySchema,
-      ignoreValueSchema: Boolean) : Unit = {
+      ignoreValueSchema: Boolean,
+      schemaEvolutionEnabled: Boolean): StateStoreColFamilySchema = {
+
+    def incrementSchemaId(id: Short): Short = (id + 1).toShort
+
+    // Initialize with old schema IDs
+    var resultSchema = newSchema.copy(
+      keySchemaId = oldSchema.keySchemaId,
+      valueSchemaId = oldSchema.valueSchemaId
+    )
     val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema,
       oldSchema.valueSchema)
     val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema)
 
     if (storedKeySchema.equals(keySchema) &&
       (ignoreValueSchema || storedValueSchema.equals(valueSchema))) {
       // schema is exactly same
+      oldSchema
     } else if (!schemasCompatible(storedKeySchema, keySchema)) {
       throw 
StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString,
         keySchema.toString)
+    } else if (!ignoreValueSchema && schemaEvolutionEnabled) {
+      // Check value schema evolution
+      val oldAvroSchema = 
SchemaConverters.toAvroTypeWithDefaults(storedValueSchema)
+      val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema)
+
+      val validator = new 
SchemaValidatorBuilder().canReadStrategy.validateAll()
+      validator.validate(newAvroSchema, Iterable(oldAvroSchema).asJava)
+
+      // Schema evolved - increment value schema ID
+      resultSchema.copy(valueSchemaId = 
incrementSchemaId(oldSchema.valueSchemaId))
     } else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, 
valueSchema)) {
       throw 
StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString,
         valueSchema.toString)
     } else {
       logInfo("Detected schema change which is compatible. Allowing to put 
rows.")
+      oldSchema
     }
   }
 
+

Review Comment:
   nit: extra newline ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -319,7 +347,8 @@ trait StateStoreWriter
 
   /** Metadata of this stateful operator and its states stores. */
   def operatorStateMetadata(
-      stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = {
+      stateSchemaPaths: List[List[String]] = List.empty
+  ): OperatorStateMetadata = {

Review Comment:
   nit: move to line above ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala:
##########
@@ -35,13 +35,19 @@ object TimerStateUtils {
   val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp"
   val TIMESTAMP_TO_KEY_CF = "_timestampToKey"
 
-  def getTimerStateVarName(timeMode: String): String = {
+  def getTimerStateVarNames(timeMode: String): (String, String) = {
     assert(timeMode == TimeMode.EventTime.toString || timeMode == 
TimeMode.ProcessingTime.toString)
-    if (timeMode == TimeMode.EventTime.toString) {
+    val primaryIndex = if (timeMode == TimeMode.EventTime.toString) {

Review Comment:
   nit: i guess this could also be split into a separate function ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -192,41 +208,56 @@ class StateSchemaCompatibilityChecker(
   def validateAndMaybeEvolveStateSchema(
       newStateSchema: List[StateStoreColFamilySchema],
       ignoreValueSchema: Boolean,
-      stateSchemaVersion: Int): Boolean = {
+      stateSchemaVersion: Int,
+      schemaEvolutionEnabled: Boolean): Boolean = {
     val existingStateSchemaList = getExistingKeyAndValueSchema()
-    val newStateSchemaList = newStateSchema
 
     if (existingStateSchemaList.isEmpty) {
-      // write the schema file if it doesn't exist
-      createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), 
stateSchemaVersion)
+      // Initialize schemas with ID 0 when no existing schema
+      val initializedSchemas = newStateSchema.map(schema =>
+        schema.copy(keySchemaId = 0, valueSchemaId = 0)
+      )
+      createSchemaFile(initializedSchemas.sortBy(_.colFamilyName), 
stateSchemaVersion)
       true
     } else {
-      // validate if the new schema is compatible with the existing schema
-      val existingSchemaMap = existingStateSchemaList.map { schema =>
+      val existingSchemaMap = existingStateSchemaList.map(schema =>
         schema.colFamilyName -> schema
-      }.toMap
-      // For each new state variable, we want to compare it to the old state 
variable
-      // schema with the same name
-      newStateSchemaList.foreach { newSchema =>
-        existingSchemaMap.get(newSchema.colFamilyName).foreach { 
existingStateSchema =>
-          check(existingStateSchema, newSchema, ignoreValueSchema)
-        }
+      ).toMap
+
+      // Process each new schema and track if any have evolved
+      val (evolvedSchemas, hasEvolutions) = newStateSchema.foldLeft(
+        (List.empty[StateStoreColFamilySchema], false)) {
+        case ((schemas, evolved), newSchema) =>
+          existingSchemaMap.get(newSchema.colFamilyName) match {
+            case Some(existingSchema) =>
+              val updatedSchema = check(
+                existingSchema, newSchema, ignoreValueSchema, 
schemaEvolutionEnabled)
+              val hasEvolved = !updatedSchema.equals(existingSchema)
+              (updatedSchema :: schemas, evolved || hasEvolved)
+            case None =>
+              // New column family - initialize with schema ID 0
+              val newSchemaWithIds = newSchema.copy(keySchemaId = 0, 
valueSchemaId = 0)
+              (newSchemaWithIds :: schemas, true)
+          }
       }
+
       val colFamiliesAddedOrRemoved =
-        (newStateSchemaList.map(_.colFamilyName).toSet != 
existingSchemaMap.keySet)
-      if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) 
{
-        createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), 
stateSchemaVersion)
+        (newStateSchema.map(_.colFamilyName).toSet != existingSchemaMap.keySet)
+      val newSchemaFileWritten = hasEvolutions || colFamiliesAddedOrRemoved
+
+      if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFileWritten) {
+        createSchemaFile(evolvedSchemas.sortBy(_.colFamilyName), 
stateSchemaVersion)
       }
-      // TODO: [SPARK-49535] Write Schema files after schema has changed for 
StateSchemaV3
-      colFamiliesAddedOrRemoved
+
+      newSchemaFileWritten
     }
   }
 
   private def schemaFile(storeCpLocation: Path): Path =
     new Path(new Path(storeCpLocation, "_metadata"), "schema")
 }
 
-object StateSchemaCompatibilityChecker {
+object StateSchemaCompatibilityChecker extends Logging {

Review Comment:
   nit: adding `Logging` intentional ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -532,10 +533,12 @@ object StateStoreProvider {
       useColumnFamilies: Boolean,
       storeConf: StateStoreConf,
       hadoopConf: Configuration,
-      useMultipleValuesPerKey: Boolean): StateStoreProvider = {
+      useMultipleValuesPerKey: Boolean,
+      stateSchemaProvider: Option[StateSchemaProvider]
+  ): StateStoreProvider = {

Review Comment:
   nit: move to line above ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -233,6 +234,33 @@ trait StateStoreWriter
     }
   }
 
+  def stateSchemaList(
+      stateSchemaValidationResults: List[StateSchemaValidationResult],
+      oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = {
+
+  def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = {

Review Comment:
   indent seems off ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to