HeartSaVioR commented on code in PR #49277:
URL: https://github.com/apache/spark/pull/49277#discussion_r1912873272


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala:
##########
@@ -201,7 +204,8 @@ trait TransformWithStateMetadataUtils extends Logging {
       stateSchemaVersion: Int,
       info: StatefulOperatorStateInfo,
       session: SparkSession,
-      operatorStateMetadataVersion: Int = 2): 
List[StateSchemaValidationResult] = {
+      operatorStateMetadataVersion: Int = 2,
+      stateStoreEncodingFormat: String = "unsaferow"): 
List[StateSchemaValidationResult] = {

Review Comment:
   Shall we use Enum rather than just string? Let's not leave the encoding 
format string as magic constant.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -242,20 +246,48 @@ class IncrementalExecution(
                     log"versions: ${MDC(ERROR, e.getMessage)}")
                 None
               }
-              oldMetadata match {
-                case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, 
metadata)
-                case None =>
-              }
+            } else {
+              None
+            }
+            val stateSchemaList = ssw.stateSchemaList(schemaValidationResult,
+              oldMetadata)
+            val metadata = ssw.operatorStateMetadata(stateSchemaList)
+            oldMetadata match {
+              case Some(oldMetadata) =>
+                ssw.validateNewMetadata(oldMetadata, metadata)
+              case None =>
             }
             val metadataWriter = OperatorStateMetadataWriter.createWriter(
                 new Path(checkpointLocation, 
ssw.getStateInfo.operatorId.toString),
                 hadoopConf,
                 ssw.operatorStateMetadataVersion,
                 Some(currentBatchId))
             metadataWriter.write(metadata)
-          case _ =>
+            if (ssw.supportsSchemaEvolution) {
+              val stateSchemaMetadata = StateSchemaMetadata.

Review Comment:
   nit: move `.` to the next line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +56,195 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private val schemas = mutable.Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  /**
+   * Captures a new schema pair (key schema and value schema) for a column 
family.
+   * Each capture creates two entries - one for the key schema and one for the 
value schema.
+   *
+   * @param colFamilyName Name of the column family
+   * @param keySchema Spark SQL schema for the key
+   * @param valueSchema Spark SQL schema for the value
+   * @param keySchemaId Schema ID for the key, defaults to 0
+   * @param valueSchemaId Schema ID for the value, defaults to 0
+   */
+  def captureSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter { key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      }
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter { key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      }
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Broadcasts schema metadata information for stateful operators in a 
streaming query.
+ *
+ * This class provides a way to distribute schema evolution information to all 
executors
+ * via Spark's broadcast mechanism. Each stateful operator in a streaming 
query maintains
+ * its own instance of this class to track schema versions and evolution.
+ *
+ * @param broadcast Spark broadcast variable containing the schema metadata
+ */
+case class StateSchemaBroadcast(
+    broadcast: Broadcast[StateSchemaMetadata]
+) extends Logging with StateSchemaProvider {
+
+  /**
+   * Retrieves the schema information for a given column family and schema 
version
+   *
+   * @param key A combination of column family name and schema ID
+   * @return The corresponding schema metadata value containing both SQL and 
Avro schemas
+   */
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    broadcast.value.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    broadcast.value.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Contains schema evolution metadata for a stateful operator.
+ *
+ * @param activeSchemas Map of all active schema versions, keyed by column 
family and schema ID.
+ *                      This includes both the current schema and any previous 
schemas that
+ *                      may still exist in the state store.
+ */
+case class StateSchemaMetadata(
+    activeSchemas: Map[StateSchemaMetadataKey, StateSchemaMetadataValue]
+)
+
+object StateSchemaMetadata {
+
+  def createStateSchemaMetadata(
+      checkpointLocation: String,
+      hadoopConf: Configuration,
+      stateSchemaFiles: List[String]
+  ): StateSchemaMetadata = {
+    val fm = CheckpointFileManager.create(new Path(checkpointLocation), 
hadoopConf)
+
+    // Build up our map of schema metadata
+    val activeSchemas = stateSchemaFiles.zipWithIndex.foldLeft(
+      Map.empty[StateSchemaMetadataKey, StateSchemaMetadataValue]) {
+      case (schemas, (stateSchemaFile, _)) =>
+        val fsDataInputStream = fm.open(new Path(stateSchemaFile))
+        val colFamilySchemas = 
StateSchemaCompatibilityChecker.readSchemaFile(fsDataInputStream)
+
+        // For each column family, create metadata entries for both key and 
value schemas
+        val schemaEntries = colFamilySchemas.flatMap { colFamilySchema =>
+          // Create key schema metadata
+          val keyAvroSchema = SchemaConverters.toAvroTypeWithDefaults(
+            colFamilySchema.keySchema)
+          val keyEntry = StateSchemaMetadataKey(
+            colFamilySchema.colFamilyName,
+            colFamilySchema.keySchemaId,
+            isKey = true
+          ) -> StateSchemaMetadataValue(
+            colFamilySchema.keySchema,
+            keyAvroSchema
+          )
+
+          // Create value schema metadata
+          val valueAvroSchema = SchemaConverters.toAvroTypeWithDefaults(
+            colFamilySchema.valueSchema)
+          val valueEntry = StateSchemaMetadataKey(
+            colFamilySchema.colFamilyName,
+            colFamilySchema.valueSchemaId,
+            isKey = false
+          ) -> StateSchemaMetadataValue(
+            colFamilySchema.valueSchema,
+            valueAvroSchema
+          )
+
+          Seq(keyEntry, valueEntry)
+        }
+
+        // Add new entries to our accumulated map
+        schemas ++ schemaEntries.toMap
+    }
+
+    // Create the final metadata
+    StateSchemaMetadata(activeSchemas = activeSchemas)

Review Comment:
   nit: doesn't seem to have benefit on named param



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4501,6 +4509,14 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_STATE_SCHEMA_FILES_THRESHOLD_EXCEEDED" : {
+    "message" : [
+      "The number of state schema files <numStateSchemaFiles> exceeds the 
maximum number of state schema files for this query: <maxStateSchemaFiles>.",
+      "Offending column families: <colFamilyNames>",
+      "Please set 'spark.sql.streaming.stateStore.stateSchemaFilesThreshold' 
to a higher number, or revert state schema modifications"

Review Comment:
   I think this is short term solution and we will eventually need to evict 
state schema versions which are no longer referred from state rows.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -468,7 +468,13 @@ class OperatorStateMetadataV2FileManager(
     // find the batchId of the earliest schema file we need to keep
     val earliestBatchToKeep = latestMetadata match {
       case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
-        val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath
+        // Currently, this purging logic only handles operators with a single 
state store
+        // instance, which is why we choose the head of this array.
+        val ssInfo = stateStoreInfo.head
+        // The schema files in the OperatorStateMetadata are sorted
+        // by earliest to latest. The head of the schemaFileList is
+        // the schema file written for batch 0.

Review Comment:
   nit: not batch 0 but the earliest batch if we are ever evicting them? If we 
aren't evicting, it's simply always 0 and we don't need to do this.



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4531,6 +4547,14 @@
     ],
     "sqlState" : "XX000"
   },
+  "STATE_STORE_VALUE_SCHEMA_EVOLUTION_THRESHOLD_EXCEEDED" : {

Review Comment:
   This is something users would be able to reason about. If we can correlate 
the above with this, we could provide the error message with users' language.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -242,20 +246,48 @@ class IncrementalExecution(
                     log"versions: ${MDC(ERROR, e.getMessage)}")
                 None
               }
-              oldMetadata match {
-                case Some(oldMetadata) => ssw.validateNewMetadata(oldMetadata, 
metadata)
-                case None =>
-              }
+            } else {
+              None
+            }
+            val stateSchemaList = ssw.stateSchemaList(schemaValidationResult,
+              oldMetadata)
+            val metadata = ssw.operatorStateMetadata(stateSchemaList)
+            oldMetadata match {
+              case Some(oldMetadata) =>
+                ssw.validateNewMetadata(oldMetadata, metadata)
+              case None =>
             }
             val metadataWriter = OperatorStateMetadataWriter.createWriter(
                 new Path(checkpointLocation, 
ssw.getStateInfo.operatorId.toString),

Review Comment:
   nit: shift 2 spaces (it's not related to this PR, but while we are here)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2334,6 +2341,13 @@ object SQLConf {
         "Valid values are 'unsaferow' and 'avro'")
       .createWithDefault("unsaferow")
 
+  val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD =
+    
buildConf("spark.sql.streaming.stateStore.valueStateSchemaEvolutionThreshold")

Review Comment:
   ditto about internal



##########
common/utils/src/main/resources/error/error-conditions.json:
##########
@@ -4460,6 +4460,14 @@
     ],
     "sqlState" : "42K06"
   },
+  "STATE_STORE_INVALID_VALUE_SCHEMA_EVOLUTION" : {
+    "message" : [
+      "Schema evolution is not possible new value_schema=<newValueSchema>.",

Review Comment:
   Do we have an information of old schema? What's the error message from Avro? 
Let's make sure users would be easily spot on the issue and fix it.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -570,10 +602,10 @@ class IncrementalExecution(
 
       // The rule below doesn't change the plan but can cause the side effect 
that

Review Comment:
   This comment is no longer true, right? Let's update it.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2323,6 +2323,13 @@ object SQLConf {
       .intConf
       .createWithDefault(1)
 
+  val STREAMING_MAX_NUM_STATE_SCHEMA_FILES =
+    buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles")
+      .doc("The maximum number of StateSchemaV3 files allowed per operator")

Review Comment:
   Do we think this is something every people will tune, or this is an advanced 
config and only expect would touch? If it's latter, it's OK to mark this as 
internal (by calling `.internal()`).



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTransformWithStateSuite.scala:
##########
@@ -128,7 +128,7 @@ class SessionGroupsStatefulProcessorWithTTL extends
  */
 @SlowSQLTest
 class StateDataSourceTransformWithStateSuite extends StateStoreMetricsTest
-  with AlsoTestWithRocksDBFeatures with AlsoTestWithEncodingTypes {
+  with AlsoTestWithEncodingTypes with AlsoTestWithRocksDBFeatures {

Review Comment:
   nit: specific reason to flip them?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -233,6 +234,33 @@ trait StateStoreWriter
     }
   }
 
+  def stateSchemaList(
+      stateSchemaValidationResults: List[StateSchemaValidationResult],
+      oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = {
+
+    def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] 
= {
+      val ssInfo = metadata.stateStoreInfo.head
+      ssInfo.stateSchemaFilePaths
+    }
+
+    val validationResult = stateSchemaValidationResults.head

Review Comment:
   Does this mean it won't work with stream-stream join? Please always put TODO 
comment if you are adding some common logic which does not work with some 
operator.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2334,6 +2341,13 @@ object SQLConf {
         "Valid values are 'unsaferow' and 'avro'")
       .createWithDefault("unsaferow")
 
+  val STREAMING_VALUE_STATE_SCHEMA_EVOLUTION_THRESHOLD =

Review Comment:
   Thanks for clarifying; this means we are not evicting the old state schema, 
right? I guess it makes sense since there is no guarantee that the eviction of 
state rows would happen e.g. complete mode.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala:
##########
@@ -233,6 +234,33 @@ trait StateStoreWriter
     }
   }
 
+  def stateSchemaList(
+      stateSchemaValidationResults: List[StateSchemaValidationResult],
+      oldMetadata: Option[OperatorStateMetadata]): List[List[String]] = {
+
+    def getExistingStateInfo(metadata: OperatorStateMetadataV2): List[String] 
= {
+      val ssInfo = metadata.stateStoreInfo.head
+      ssInfo.stateSchemaFilePaths
+    }
+
+    val validationResult = stateSchemaValidationResults.head
+
+    oldMetadata match {
+      case Some(v2: OperatorStateMetadataV2) =>
+        val oldSchemaPaths = getExistingStateInfo(v2)
+        if (validationResult.evolvedSchema) {
+          List(oldSchemaPaths ++ List(validationResult.schemaPath))

Review Comment:
   If we assume multiple state stores, it'd be safer to associate with store 
name instead of just index. If we come to handle stream-stream join, either we 
end up with strict ordering of state stores or we will change this to map. 
Personally I'm in favor of latter - it's already too many things to memorize.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -2323,6 +2323,13 @@ object SQLConf {
       .intConf
       .createWithDefault(1)
 
+  val STREAMING_MAX_NUM_STATE_SCHEMA_FILES =
+    buildConf("spark.sql.streaming.stateStore.maxNumStateSchemaFiles")
+      .doc("The maximum number of StateSchemaV3 files allowed per operator")

Review Comment:
   Also, let's think how customer would interpret this. I would imagine no one 
would know about StateSchemaV3 files.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -468,7 +468,8 @@ class OperatorStateMetadataV2FileManager(
     // find the batchId of the earliest schema file we need to keep
     val earliestBatchToKeep = latestMetadata match {
       case Some(OperatorStateMetadataV2(_, stateStoreInfo, _)) =>
-        val schemaFilePath = stateStoreInfo.head.stateSchemaFilePath
+        val ssInfo = stateStoreInfo.head

Review Comment:
   Your above comment does not seem to be the same with what you commented in 
the code.
   
   If this is the case (code comment is correct), shall we leave TODO to 
address when we add stream-stream join into the case?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -51,6 +54,187 @@ sealed trait RocksDBValueStateEncoder {
   def decodeValues(valueBytes: Array[Byte]): Iterator[UnsafeRow]
 }
 
+trait StateSchemaProvider extends Serializable {
+  def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue
+
+  def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): Short
+}
+
+// Test implementation that can be dynamically updated
+class TestStateSchemaProvider extends StateSchemaProvider {
+  private var schemas = Map.empty[StateSchemaMetadataKey, 
StateSchemaMetadataValue]
+
+  def addSchema(
+      colFamilyName: String,
+      keySchema: StructType,
+      valueSchema: StructType,
+      keySchemaId: Short = 0,
+      valueSchemaId: Short = 0): Unit = {
+    schemas ++= Map(
+      StateSchemaMetadataKey(colFamilyName, keySchemaId, isKey = true) ->
+        StateSchemaMetadataValue(keySchema, 
SchemaConverters.toAvroTypeWithDefaults(keySchema)),
+      StateSchemaMetadataKey(colFamilyName, valueSchemaId, isKey = false) ->
+        StateSchemaMetadataValue(valueSchema, 
SchemaConverters.toAvroTypeWithDefaults(valueSchema))
+    )
+  }
+
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    schemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    schemas.keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey)
+      .map(_.schemaId).max
+  }
+}
+
+class InMemoryStateSchemaProvider(metadata: StateSchemaMetadata) extends 
StateSchemaProvider {
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    metadata.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    metadata.activeSchemas
+      .keys
+      .filter(key =>
+        key.colFamilyName == colFamilyName &&
+          key.isKey == isKey
+      )
+      .map(_.schemaId).max
+  }
+}
+
+/**
+ * Broadcasts schema metadata information for stateful operators in a 
streaming query.
+ *
+ * This class provides a way to distribute schema evolution information to all 
executors
+ * via Spark's broadcast mechanism. Each stateful operator in a streaming 
query maintains
+ * its own instance of this class to track schema versions and evolution.
+ *
+ * @param broadcast Spark broadcast variable containing the schema metadata
+ */
+case class StateSchemaBroadcast(
+    broadcast: Broadcast[StateSchemaMetadata]
+) extends Logging with StateSchemaProvider {
+
+  /**
+   * Retrieves the schema information for a given column family and schema 
version
+   *
+   * @param key A combination of column family name and schema ID
+   * @return The corresponding schema metadata value containing both SQL and 
Avro schemas
+   */
+  override def getSchemaMetadataValue(key: StateSchemaMetadataKey): 
StateSchemaMetadataValue = {
+    broadcast.value.activeSchemas(key)
+  }
+
+  override def getCurrentStateSchemaId(colFamilyName: String, isKey: Boolean): 
Short = {
+    broadcast.value.activeSchemas
+      .keys
+      .filter(key =>

Review Comment:
   This is yet to be addressed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to