HeartSaVioR commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1912873272
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala: ########## @@ -201,7 +204,8 @@ trait TransformWithStateMetadataUtils extends Logging { stateSchemaVersion: Int, info: StatefulOperatorStateInfo, session: SparkSession, - operatorStateMetadataVersion: Int = 2): List[StateSchemaValidationResult] = { + operatorStateMetadataVersion: Int = 2, + stateStoreEncodingFormat: String = "unsaferow"): List[StateSchemaValidationResult] = { Review Comment: Shall we use Enum rather than just string? Let's not leave the encoding format string as magic constant. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -242,20 +246,48 @@ class IncrementalExecution( log"versions: ${MDC(ERROR, e.getMessage)}") None } - oldMetadata match { - case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata) - case None => - } + } else { + None + } + val stateSchemaList = ssw.stateSchemaList(schemaValidationResult, + oldMetadata) + val metadata = ssw.operatorStateMetadata(stateSchemaList) + oldMetadata match { + case Some(oldMetadata) => + ssw.validateNewMetadata(oldMetadata, metadata) + case None => } val metadataWriter = OperatorStateMetadataWriter.createWriter( new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString), hadoopConf, ssw.operatorStateMetadataVersion, Some(currentBatchId)) metadataWriter.write(metadata) - case _ => + if (ssw.supportsSchemaEvolution) { + val stateSchemaMetadata = StateSchemaMetadata. Review Comment: nit: move `.` to the next line ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +56,195 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private val schemas = mutable.Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + /** + * Captures a new schema pair (key schema and value schema) for a column family. + * Each capture creates two entries - one for the key schema and one for the value schema. + * + * @param colFamilyName Name of the column family + * @param keySchema Spark SQL schema for the key + * @param valueSchema Spark SQL schema for the value + * @param keySchemaId Schema ID for the key, defaults to 0 + * @param valueSchemaId Schema ID for the value, defaults to 0 + */ + def captureSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter { key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + } + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter { key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + } + .map(_.schemaId).max + } +} + +/** + * Broadcasts schema metadata information for stateful operators in a streaming query. + * + * This class provides a way to distribute schema evolution information to all executors + * via Spark's broadcast mechanism. Each stateful operator in a streaming query maintains + * its own instance of this class to track schema versions and evolution. + * + * @param broadcast Spark broadcast variable containing the schema metadata + */ +case class StateSchemaBroadcast( + broadcast: Broadcast[StateSchemaMetadata] +) extends Logging with StateSchemaProvider { + + /** + * Retrieves the schema information for a given column family and schema version + * + * @param key A combination of column family name and schema ID + * @return The corresponding schema metadata value containing both SQL and Avro schemas + */ + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + broadcast.value.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + broadcast.value.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Contains schema evolution metadata for a stateful operator. + * + * @param activeSchemas Map of all active schema versions, keyed by column family and schema ID. + * This includes both the current schema and any previous schemas that + * may still exist in the state store. + */ +case class StateSchemaMetadata( + activeSchemas: Map[StateSchemaMetadataKey, StateSchemaMetadataValue] +) + +object StateSchemaMetadata { + + def createStateSchemaMetadata( + checkpointLocation: String, + hadoopConf: Configuration, + stateSchemaFiles: List[String] + ): StateSchemaMetadata = { + val fm = CheckpointFileManager.create(new Path(checkpointLocation), hadoopConf) + + // Build up our map of schema metadata + val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft( + Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) { + case (schemas, (stateSchemaFile, _)) => + val fsDataInputStream = fm.open(new Path(stateSchemaFile)) + val colFamilySchemas = StateSchemaCompatibilityChecker.readSchemaFile(fsDataInputStream) + + // For each column family, create metadata entries for both key and value schemas + val schemaEntries = colFamilySchemas.flatMap { colFamilySchema => + // Create key schema metadata + val keyAvroSchema = SchemaConverters.toAvroTypeWithDefaults( + colFamilySchema.keySchema) + val keyEntry = StateSchemaMetadataKey( + colFamilySchema.colFamilyName, + colFamilySchema.keySchemaId, + isKey = true + ) -> StateSchemaMetadataValue( + colFamilySchema.keySchema, + keyAvroSchema + ) + + // Create value schema metadata + val valueAvroSchema = SchemaConverters.toAvroTypeWithDefaults( + colFamilySchema.valueSchema) + val valueEntry = StateSchemaMetadataKey( + colFamilySchema.colFamilyName, + colFamilySchema.valueSchemaId, + isKey = false + ) -> StateSchemaMetadataValue( + colFamilySchema.valueSchema, + valueAvroSchema + ) + + Seq(keyEntry, valueEntry) + } + + // Add new entries to our accumulated map + schemas ++ schemaEntries.toMap + } + + // Create the final metadata + StateSchemaMetadata(activeSchemas = activeSchemas) Review Comment: nit: doesn't seem to have benefit on named param ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -4501,6 +4509,14 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_STATE_SCHEMA_FILES_THRESHOLD_EXCEEDED" : { + "message" : [ + "The number of state schema files <numStateSchemaFiles> exceeds the maximum number of state schema files for this query: <maxStateSchemaFiles>.", + "Offending column families: <colFamilyNames>", + "Please set 'spark.sql.streaming.stateStore.stateSchemaFilesThreshold' to a higher number, or revert state schema modifications" Review Comment: I think this is short term solution and we will eventually need to evict state schema versions which are no longer referred from state rows. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -468,7 +468,13 @@ class OperatorStateMetadataV2FileManager( // find the batchId of the earliest schema file we need to keep val earliestBatchToKeep = latestMetadata match { case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) => - val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath + // Currently, this purging logic only handles operators with a single state store + // instance, which is why we choose the head of this array. + val ssInfo = stateStoreInfo.head + // The schema files in the OperatorStateMetadata are sorted + // by earliest to latest. The head of the schemaFileList is + // the schema file written for batch 0. Review Comment: nit: not batch 0 but the earliest batch if we are ever evicting them? If we aren't evicting, it's simply always 0 and we don't need to do this. ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -4531,6 +4547,14 @@ ], "sqlState" : "XX000" }, + "STATE_STORE_VALUE_SCHEMA_EVOLUTION_THRESHOLD_EXCEEDED" : { Review Comment: This is something users would be able to reason about. If we can correlate the above with this, we could provide the error message with users' language. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -242,20 +246,48 @@ class IncrementalExecution( log"versions: ${MDC(ERROR, e.getMessage)}") None } - oldMetadata match { - case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, metadata) - case None => - } + } else { + None + } + val stateSchemaList = ssw.stateSchemaList(schemaValidationResult, + oldMetadata) + val metadata = ssw.operatorStateMetadata(stateSchemaList) + oldMetadata match { + case Some(oldMetadata) => + ssw.validateNewMetadata(oldMetadata, metadata) + case None => } val metadataWriter = OperatorStateMetadataWriter.createWriter( new Path(checkpointLocation, ssw.getStateInfo.operatorId.toString), Review Comment: nit: shift 2 spaces (it's not related to this PR, but while we are here) ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2334,6 +2341,13 @@ object SQLConf { "Valid values are 'unsaferow' and 'avro'") .createWithDefault("unsaferow") + val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD = + buildConf("spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold") Review Comment: ditto about internal ########## common/utils/src/main/resources/error/error-conditions.json: ########## @@ -4460,6 +4460,14 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_INVALID_VALUE_SCHEMA_EVOLUTION" : { + "message" : [ + "Schema evolution is not possible new value_schema=<newValueSchema>.", Review Comment: Do we have an information of old schema? What's the error message from Avro? Let's make sure users would be easily spot on the issue and fix it. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -570,10 +602,10 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that Review Comment: This comment is no longer true, right? Let's update it. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2323,6 +2323,13 @@ object SQLConf { .intConf .createWithDefault(1) + val STREAMING_MAX_NUM_STATE_SCHEMA_FILES = + buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles") + .doc("The maximum number of StateSchemaV3 files allowed per operator") Review Comment: Do we think this is something every people will tune, or this is an advanced config and only expect would touch? If it's latter, it's OK to mark this as internal (by calling `.internal()`). ########## sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala: ########## @@ -128,7 +128,7 @@ class SessionGroupsStatefulProcessorWithTTL extends */ @SlowSQLTest class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest - with AlsoTestWithRocksDBFeatures with AlsoTestWithEncodingTypes { + with AlsoTestWithEncodingTypes with AlsoTestWithRocksDBFeatures { Review Comment: nit: specific reason to flip them? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -233,6 +234,33 @@ trait StateStoreWriter } } + def stateSchemaList( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = { + + def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = { + val ssInfo = metadata.stateStoreInfo.head + ssInfo.stateSchemaFilePaths + } + + val validationResult = stateSchemaValidationResults.head Review Comment: Does this mean it won't work with stream-stream join? Please always put TODO comment if you are adding some common logic which does not work with some operator. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2334,6 +2341,13 @@ object SQLConf { "Valid values are 'unsaferow' and 'avro'") .createWithDefault("unsaferow") + val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD = Review Comment: Thanks for clarifying; this means we are not evicting the old state schema, right? I guess it makes sense since there is no guarantee that the eviction of state rows would happen e.g. complete mode. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala: ########## @@ -233,6 +234,33 @@ trait StateStoreWriter } } + def stateSchemaList( + stateSchemaValidationResults: List[StateSchemaValidationResult], + oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = { + + def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] = { + val ssInfo = metadata.stateStoreInfo.head + ssInfo.stateSchemaFilePaths + } + + val validationResult = stateSchemaValidationResults.head + + oldMetadata match { + case Some(v2: OperatorStateMetadataV2) => + val oldSchemaPaths = getExistingStateInfo(v2) + if (validationResult.evolvedSchema) { + List(oldSchemaPaths ++ List(validationResult.schemaPath)) Review Comment: If we assume multiple state stores, it'd be safer to associate with store name instead of just index. If we come to handle stream-stream join, either we end up with strict ordering of state stores or we will change this to map. Personally I'm in favor of latter - it's already too many things to memorize. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -2323,6 +2323,13 @@ object SQLConf { .intConf .createWithDefault(1) + val STREAMING_MAX_NUM_STATE_SCHEMA_FILES = + buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles") + .doc("The maximum number of StateSchemaV3 files allowed per operator") Review Comment: Also, let's think how customer would interpret this. I would imagine no one would know about StateSchemaV3 files. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala: ########## @@ -468,7 +468,8 @@ class OperatorStateMetadataV2FileManager( // find the batchId of the earliest schema file we need to keep val earliestBatchToKeep = latestMetadata match { case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) => - val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath + val ssInfo = stateStoreInfo.head Review Comment: Your above comment does not seem to be the same with what you commented in the code. If this is the case (code comment is correct), shall we leave TODO to address when we add stream-stream join into the case? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala: ########## @@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder { def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow] } +trait StateSchemaProvider extends Serializable { + def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue + + def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short +} + +// Test implementation that can be dynamically updated +class TestStateSchemaProvider extends StateSchemaProvider { + private var schemas = Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue] + + def addSchema( + colFamilyName: String, + keySchema: StructType, + valueSchema: StructType, + keySchemaId: Short = 0, + valueSchemaId: Short = 0): Unit = { + schemas ++= Map( + StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) -> + StateSchemaMetadataValue(keySchema, SchemaConverters.toAvroTypeWithDefaults(keySchema)), + StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) -> + StateSchemaMetadataValue(valueSchema, SchemaConverters.toAvroTypeWithDefaults(valueSchema)) + ) + } + + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + schemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + schemas.keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey) + .map(_.schemaId).max + } +} + +class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends StateSchemaProvider { + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + metadata.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + metadata.activeSchemas + .keys + .filter(key => + key.colFamilyName == colFamilyName && + key.isKey == isKey + ) + .map(_.schemaId).max + } +} + +/** + * Broadcasts schema metadata information for stateful operators in a streaming query. + * + * This class provides a way to distribute schema evolution information to all executors + * via Spark's broadcast mechanism. Each stateful operator in a streaming query maintains + * its own instance of this class to track schema versions and evolution. + * + * @param broadcast Spark broadcast variable containing the schema metadata + */ +case class StateSchemaBroadcast( + broadcast: Broadcast[StateSchemaMetadata] +) extends Logging with StateSchemaProvider { + + /** + * Retrieves the schema information for a given column family and schema version + * + * @param key A combination of column family name and schema ID + * @return The corresponding schema metadata value containing both SQL and Avro schemas + */ + override def getSchemaMetadataValue(key: StateSchemaMetadataKey): StateSchemaMetadataValue = { + broadcast.value.activeSchemas(key) + } + + override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short = { + broadcast.value.activeSchemas + .keys + .filter(key => Review Comment: This is yet to be addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org