anishshri-db commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1902148602
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -163,25 +157,47 @@ class StateSchemaCompatibilityChecker( private def check( oldSchema: StateStoreColFamilySchema, newSchema: StateStoreColFamilySchema, - ignoreValueSchema: Boolean) : Unit = { + ignoreValueSchema: Boolean, + schemaEvolutionEnabled: Boolean): StateStoreColFamilySchema = { + + def incrementSchemaId(id: Short): Short = (id + 1).toShort + + // Initialize with old schema IDs + var resultSchema = newSchema.copy( + keySchemaId = oldSchema.keySchemaId, + valueSchemaId = oldSchema.valueSchemaId + ) val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema, oldSchema.valueSchema) val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema) if (storedKeySchema.equals(keySchema) && (ignoreValueSchema || storedValueSchema.equals(valueSchema))) { // schema is exactly same + oldSchema } else if (!schemasCompatible(storedKeySchema, keySchema)) { throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString, keySchema.toString) + } else if (!ignoreValueSchema && schemaEvolutionEnabled) { + // Check value schema evolution + val oldAvroSchema = SchemaConverters.toAvroTypeWithDefaults(storedValueSchema) + val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema) + + val validator = new SchemaValidatorBuilder().canReadStrategy.validateAll() Review Comment: What happens if new schema is not compatible with old for evolution ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -163,25 +157,47 @@ class StateSchemaCompatibilityChecker( private def check( oldSchema: StateStoreColFamilySchema, newSchema: StateStoreColFamilySchema, - ignoreValueSchema: Boolean) : Unit = { + ignoreValueSchema: Boolean, + schemaEvolutionEnabled: Boolean): StateStoreColFamilySchema = { + + def incrementSchemaId(id: Short): Short = (id + 1).toShort + + // Initialize with old schema IDs + var resultSchema = newSchema.copy( + keySchemaId = oldSchema.keySchemaId, + valueSchemaId = oldSchema.valueSchemaId + ) val (storedKeySchema, storedValueSchema) = (oldSchema.keySchema, oldSchema.valueSchema) val (keySchema, valueSchema) = (newSchema.keySchema, newSchema.valueSchema) if (storedKeySchema.equals(keySchema) && (ignoreValueSchema || storedValueSchema.equals(valueSchema))) { // schema is exactly same + oldSchema } else if (!schemasCompatible(storedKeySchema, keySchema)) { throw StateStoreErrors.stateStoreKeySchemaNotCompatible(storedKeySchema.toString, keySchema.toString) + } else if (!ignoreValueSchema && schemaEvolutionEnabled) { + // Check value schema evolution + val oldAvroSchema = SchemaConverters.toAvroTypeWithDefaults(storedValueSchema) + val newAvroSchema = SchemaConverters.toAvroTypeWithDefaults(valueSchema) + + val validator = new SchemaValidatorBuilder().canReadStrategy.validateAll() + validator.validate(newAvroSchema, Iterable(oldAvroSchema).asJava) + + // Schema evolved - increment value schema ID + resultSchema.copy(valueSchemaId = incrementSchemaId(oldSchema.valueSchemaId)) } else if (!ignoreValueSchema && !schemasCompatible(storedValueSchema, valueSchema)) { throw StateStoreErrors.stateStoreValueSchemaNotCompatible(storedValueSchema.toString, valueSchema.toString) } else { logInfo("Detected schema change which is compatible. Allowing to put rows.") + oldSchema } } + Review Comment: nit: extra newline ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -319,7 +347,8 @@ trait StateStoreWriter /** Metadata of this stateful operator and its states stores. */ def operatorStateMetadata( - stateSchemaPaths: List[String] = List.empty): OperatorStateMetadata = { + stateSchemaPaths: List[List[String]] = List.empty + ): OperatorStateMetadata = { Review Comment: nit: move to line above ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TimerStateImpl.scala: ########## @@ -35,13 +35,19 @@ object TimerStateUtils { val KEY_TO_TIMESTAMP_CF = "_keyToTimestamp" val TIMESTAMP_TO_KEY_CF = "_timestampToKey" - def getTimerStateVarName(timeMode: String): String = { + def getTimerStateVarNames(timeMode: String): (String, String) = { assert(timeMode == TimeMode.EventTime.toString || timeMode == TimeMode.ProcessingTime.toString) - if (timeMode == TimeMode.EventTime.toString) { + val primaryIndex = if (timeMode == TimeMode.EventTime.toString) { Review Comment: nit: i guess this could also be split into a separate function ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -192,41 +208,56 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { + stateSchemaVersion: Int, + schemaEvolutionEnabled: Boolean): Boolean = { val existingStateSchemaList = getExistingKeyAndValueSchema() - val newStateSchemaList = newStateSchema if (existingStateSchemaList.isEmpty) { - // write the schema file if it doesn't exist - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + // Initialize schemas with ID 0 when no existing schema + val initializedSchemas = newStateSchema.map(schema => + schema.copy(keySchemaId = 0, valueSchemaId = 0) + ) + createSchemaFile(initializedSchemas.sortBy(_.colFamilyName), stateSchemaVersion) true } else { - // validate if the new schema is compatible with the existing schema - val existingSchemaMap = existingStateSchemaList.map { schema => + val existingSchemaMap = existingStateSchemaList.map(schema => schema.colFamilyName -> schema - }.toMap - // For each new state variable, we want to compare it to the old state variable - // schema with the same name - newStateSchemaList.foreach { newSchema => - existingSchemaMap.get(newSchema.colFamilyName).foreach { existingStateSchema => - check(existingStateSchema, newSchema, ignoreValueSchema) - } + ).toMap + + // Process each new schema and track if any have evolved + val (evolvedSchemas, hasEvolutions) = newStateSchema.foldLeft( + (List.empty[StateStoreColFamilySchema], false)) { + case ((schemas, evolved), newSchema) => + existingSchemaMap.get(newSchema.colFamilyName) match { + case Some(existingSchema) => + val updatedSchema = check( + existingSchema, newSchema, ignoreValueSchema, schemaEvolutionEnabled) + val hasEvolved = !updatedSchema.equals(existingSchema) + (updatedSchema :: schemas, evolved || hasEvolved) + case None => + // New column family - initialize with schema ID 0 + val newSchemaWithIds = newSchema.copy(keySchemaId = 0, valueSchemaId = 0) + (newSchemaWithIds :: schemas, true) + } } + val colFamiliesAddedOrRemoved = - (newStateSchemaList.map(_.colFamilyName).toSet != existingSchemaMap.keySet) - if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) { - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + (newStateSchema.map(_.colFamilyName).toSet != existingSchemaMap.keySet) + val newSchemaFileWritten = hasEvolutions || colFamiliesAddedOrRemoved + + if (stateSchemaVersion == SCHEMA_FORMAT_V3 && newSchemaFileWritten) { + createSchemaFile(evolvedSchemas.sortBy(_.colFamilyName), stateSchemaVersion) } - // TODO: [SPARK-49535] Write Schema files after schema has changed for StateSchemaV3 - colFamiliesAddedOrRemoved + + newSchemaFileWritten } } private def schemaFile(storeCpLocation: Path): Path = new Path(new Path(storeCpLocation, "_metadata"), "schema") } -object StateSchemaCompatibilityChecker { +object StateSchemaCompatibilityChecker extends Logging { Review Comment: nit: adding `Logging` intentional ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -532,10 +533,12 @@ object StateStoreProvider { useColumnFamilies: Boolean, storeConf: StateStoreConf, hadoopConf: Configuration, - useMultipleValuesPerKey: Boolean): StateStoreProvider = { + useMultipleValuesPerKey: Boolean, + stateSchemaProvider: Option[StateSchemaProvider] + ): StateStoreProvider = { Review Comment: nit: move to line above ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -233,6 +234,33 @@ trait StateStoreWriter } } + def stateSchemaList( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = { + + def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = { Review Comment: indent seems off ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org