ericm-db commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1906134720
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -192,41 +242,70 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { - val existingStateSchemaList = getExistingKeyAndValueSchema() - val newStateSchemaList = newStateSchema - - if (existingStateSchemaList.isEmpty) { - // write the schema file if it doesn't exist - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + stateSchemaVersion: Int, + schemaEvolutionEnabled: Boolean): Boolean = { + val existingStateSchemaMap = readSchemaFiles() + val mostRecentColFamilies = getExistingKeyAndValueSchema().map(_.colFamilyName) + if (mostRecentColFamilies.isEmpty) { + // Initialize schemas with ID 0 when no existing schema + val initializedSchemas = newStateSchema.map(schema => + schema.copy(keySchemaId = 0, valueSchemaId = 0) + ) + createSchemaFile(initializedSchemas.sortBy(_.colFamilyName), stateSchemaVersion) true } else { - // validate if the new schema is compatible with the existing schema - val existingSchemaMap = existingStateSchemaList.map { schema => - schema.colFamilyName -> schema - }.toMap - // For each new state variable, we want to compare it to the old state variable - // schema with the same name - newStateSchemaList.foreach { newSchema => - existingSchemaMap.get(newSchema.colFamilyName).foreach { existingStateSchema => - check(existingStateSchema, newSchema, ignoreValueSchema) - } + + // Process each new schema and track if any have evolved + val (evolvedSchemas, hasEvolutions) = newStateSchema.foldLeft( + (List.empty[StateStoreColFamilySchema], false)) { + case ((schemas, evolved), newSchema) => + existingStateSchemaMap.get(newSchema.colFamilyName) match { + case Some(existingSchemas) => + val (updatedSchema, hasEvolved) = check( + existingSchemas, newSchema, ignoreValueSchema, schemaEvolutionEnabled) + if (oldSchemaFilePaths.size == conf.maxStateSchemaFiles && hasEvolved) { + throw StateStoreErrors.stateStoreSchemaFileThresholdExceeded( + oldSchemaFilePaths.size + 1, + conf.maxStateSchemaFiles, + List(newSchema.colFamilyName) + ) + } + (updatedSchema :: schemas, evolved || hasEvolved) + case None => + // New column family - initialize with schema ID 0 + val newSchemaWithIds = newSchema.copy(keySchemaId = 0, valueSchemaId = 0) + (newSchemaWithIds :: schemas, true) + } } - val colFamiliesAddedOrRemoved = - (newStateSchemaList.map(_.colFamilyName).toSet != existingSchemaMap.keySet) - if (stateSchemaVersion == SCHEMA_FORMAT_V3 && colFamiliesAddedOrRemoved) { - createSchemaFile(newStateSchemaList.sortBy(_.colFamilyName), stateSchemaVersion) + + val newColFamilies = newStateSchema.map(_.colFamilyName).toSet + val oldColFamilies = mostRecentColFamilies.toSet + val colFamiliesAddedOrRemoved = newColFamilies != oldColFamilies + val newSchemaFileWritten = hasEvolutions || colFamiliesAddedOrRemoved + + if (oldSchemaFilePaths.size == conf.maxStateSchemaFiles && + colFamiliesAddedOrRemoved) { + throw StateStoreErrors.stateStoreSchemaFileThresholdExceeded( + oldSchemaFilePaths.size + 1, + conf.maxStateSchemaFiles, + // need to compute symmetric diff between col family list + (newColFamilies.diff(oldColFamilies) ++ Review Comment: Any column families that were added or removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org