HeartSaVioR commented on code in PR #47524:
URL: https://github.com/apache/spark/pull/47524#discussion_r1710830978


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -166,21 +170,43 @@ class StateSchemaCompatibilityChecker(
   def validateAndMaybeEvolveStateSchema(
       newStateSchema: List[StateStoreColFamilySchema],
       ignoreValueSchema: Boolean,
-      stateSchemaVersion: Int): Boolean = {
+      stateSchemaVersion: Int): (Boolean, List[StateStoreColFamilySchema]) = {
     val existingStateSchemaList = 
getExistingKeyAndValueSchema().sortBy(_.colFamilyName)
-    val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName)
+    // assign colFamilyIds based on position in list
+    var maxId: Short = 
existingStateSchemaList.map(_.colFamilyId).maxOption.getOrElse(0)
 
     if (existingStateSchemaList.isEmpty) {
       // write the schema file if it doesn't exist
+      val newStateSchemaList = 
newStateSchema.sortBy(_.colFamilyName).zipWithIndex.map {
+        case (schema, index) =>
+          schema.copy(colFamilyId = (index + 1).toShort)
+      }
       createSchemaFile(newStateSchemaList, stateSchemaVersion)
-      true
+      (true, newStateSchemaList)
     } else {
-      // validate if the new schema is compatible with the existing schema
-      existingStateSchemaList.lazyZip(newStateSchemaList).foreach {
-        case (existingStateSchema, newStateSchema) =>
-          check(existingStateSchema, newStateSchema, ignoreValueSchema)
+      // create a hashmap of name to schema from existing state schema list
+      val existingSchemas = existingStateSchemaList.map { schema =>
+        schema.colFamilyName -> schema
+      }.toMap
+      // For each column family we are creating, check if it existed in the
+      // previous run, and assign its ID to the old ID if the schema hasn't 
changed.
+      // If the schema has evolved, assign it a new ID.

Review Comment:
   I don't quite understand about this, but I might be missing.
   
   1. Here we use 'check()', which is actually checking the "compatibility" of 
schema, not to handle schema evolution. (We allow the change of name and 
nullability for column in state.) I guess the spec of schema evolution we are 
demanding for transformWithState is more flexible than what we do in check().
   2. If we assign to a new ID, wouldn't we lose the existing data, say, 
dataloss? Where we migrate the existing values for old ID to new ID? If there 
is one, shall we defer the change to the time when we truly support schema 
evolution?
   
   We even return `false` to indicate caller that there was no schema evolution 
and what we are doing to quite different.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -129,6 +129,7 @@ trait StateStore extends ReadStateStore {
    */
   def createColFamilyIfAbsent(
       colFamilyName: String,
+      columnFamilyId: Short,

Review Comment:
   I guess it's not happening now (if you made a migration logic then it could 
be happening), but what if we are re-assigning CF ID for the same CF name due 
to schema change? Will we perform `removeColFamilyIfExists` and then 
`createColFamilyIfAbsent`? Isn't it dangerous now to allow calling the same CF 
name with different CF ID without removing existing CF?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -166,21 +170,43 @@ class StateSchemaCompatibilityChecker(
   def validateAndMaybeEvolveStateSchema(
       newStateSchema: List[StateStoreColFamilySchema],
       ignoreValueSchema: Boolean,
-      stateSchemaVersion: Int): Boolean = {
+      stateSchemaVersion: Int): (Boolean, List[StateStoreColFamilySchema]) = {
     val existingStateSchemaList = 
getExistingKeyAndValueSchema().sortBy(_.colFamilyName)
-    val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName)
+    // assign colFamilyIds based on position in list

Review Comment:
   This code comment is misleading, it is only applied when 
existingStateSchemaList is empty, right? please move this to proper place.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala:
##########
@@ -234,7 +253,8 @@ class MapStateSuite extends StateVariableSuiteBase {
       val batchTimestampMs = 10
       val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(),
         Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]],
-        TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs))
+        TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs),
+        columnFamilyIds = Map("testState" -> 1.toShort))

Review Comment:
   missing `_ttl_testState`? doesn't seem to matter for the test result though.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -86,7 +86,8 @@ class StatefulProcessorHandleImpl(
     timeMode: TimeMode,
     isStreaming: Boolean = true,
     batchTimestampMs: Option[Long] = None,
-    metrics: Map[String, SQLMetric] = Map.empty)
+    metrics: Map[String, SQLMetric] = Map.empty,
+    columnFamilyIds: Map[String, Short] = Map.empty)

Review Comment:
   nit: maybe add `@param columnFamilyIds` into class doc and fill out 
explanation? Probably odd to only miss this.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -144,8 +147,16 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     test(s"registering timeouts with timeMode=$timeMode should succeed") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
+        val timerCFName = if (timeMode == "ProcessingTime") {

Review Comment:
   Why not extracting this to the private method providing two CF names for 
given timeMode? Looks like we are introducing non-trivial number of duplicated 
code.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -129,6 +129,7 @@ trait StateStore extends ReadStateStore {
    */
   def createColFamilyIfAbsent(
       colFamilyName: String,
+      columnFamilyId: Short,

Review Comment:
   Also please update the method doc for the semantic of the behavior about CF 
ID. It was obvious and it is no longer obvious. (It's still following the 
existing behavior, but confusing and dangerous as CF ID is the "real" key.)



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala:
##########
@@ -58,16 +58,19 @@ private[sql] class RocksDBStateStoreProvider
 
     override def createColFamilyIfAbsent(
         colFamilyName: String,
+        colFamilyId: Short,
         keySchema: StructType,
         valueSchema: StructType,
         keyStateEncoderSpec: KeyStateEncoderSpec,
         useMultipleValuesPerKey: Boolean = false,
         isInternal: Boolean = false): Unit = {
-      val newColFamilyId = 
ColumnFamilyUtils.createColFamilyIfAbsent(colFamilyName, isInternal)
-
+      ColumnFamilyUtils.
+        verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, 
isInternal)
+      colFamilyNameToIdMap.put(colFamilyName, colFamilyId)

Review Comment:
   Likewise I commented in the StateStore instance, we need to revisit the 
semantic. If we blindly allow this overwrite, we will lose access to the 
existing data (say, dangling pointer) meaning dataloss.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -242,7 +243,20 @@ class IncrementalExecution(
             metadataWriter.write(metadata)
           case _ =>
         }
-        statefulOp
+        statefulOp match {
+          case tws: TransformWithStateExec =>

Review Comment:
   Doesn't look like we need to scope this to TransformWithStateExec as we go 
through the same code path for every stateful operator. I know it's only 
effective for TransformWithStateExec, but just to have less code branch.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala:
##########
@@ -703,6 +706,24 @@ object StreamExecution {
     "py4j.protocol.Py4JJavaError: An error occurred while calling" +
       s"((.|\\r\\n|\\r|\\n)*)(${IO_EXCEPTION_NAMES.mkString("|")})").r
 
+  private val columnFamilySchemas:

Review Comment:
   Is there a reason to require a single cache for this? StreamExecution 
instance is created per query, so once we persist this into StreamExecution and 
make the IncrementalExecution to update back, there does not seem to be a 
reason to have "global" cache for this. No need to consider cleaning up.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -902,7 +902,8 @@ class TransformWithStateSuite extends StateStoreMetricsTest
           keySchema,
           new StructType().add("value", LongType, false),
           Some(NoPrefixKeyStateEncoderSpec(keySchema)),
-          None
+          None,
+          1.toShort

Review Comment:
   We predicted the future? :) The state names are alphabetically sorted.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -667,12 +678,35 @@ object TransformWithStateExec {
       initialStateDeserializer: Expression,
       initialState: SparkPlan): SparkPlan = {
     val shufflePartitions = 
child.session.sessionState.conf.numShufflePartitions
+
+    // We are assigning the columnFamilyIds based on their alphabetic order
+    // in the batch world since there is no notion of restarts, and we don't 
need
+    // the columnFamilyIds here to be consistent between runs
+    val driverProcessorHandle = new 
DriverStatefulProcessorHandleImpl(timeMode, keyEncoder)
+    driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT)
+    statefulProcessor.setHandle(driverProcessorHandle)
+    statefulProcessor.init(outputMode, timeMode)
+    if (timeMode != NoTime) {
+      driverProcessorHandle.registerTimer(0L)
+    }
+    val columnFamilySchemas = driverProcessorHandle.getColumnFamilySchemas
+    // assign columnFamilyIds based on alphabetic name within map
+    val columnFamilySchemasWithIds = columnFamilySchemas.toList
+      .sortBy(_._2.colFamilyName)
+      .zipWithIndex
+      .map { case ((name, schema), index) =>
+        name -> schema.copy(colFamilyId = index.toShort)

Review Comment:
   I guess we don't reserve the ID 0 in streaming side. Should this be applied 
to batch side as well?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala:
##########


Review Comment:
   Once we make clear about the semantic of different CF IDs for the same CF 
name, I'd like to see the new test verifying the behavior.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala:
##########
@@ -166,21 +170,43 @@ class StateSchemaCompatibilityChecker(
   def validateAndMaybeEvolveStateSchema(

Review Comment:
   So we need to think about the semantic of return value - we don't seem to 
have a case of schema evolution for each state, but more or less states are 
allowed. Should we call this as evolved or not (given it's not "schema" 
evolution)? Let's clarify the semantic of "evolution" in the method doc so that 
there is no confusion.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##########
@@ -18,47 +18,112 @@ package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import 
org.apache.spark.sql.execution.streaming.StateTTLSchema.TTL_VALUE_ROW_SCHEMA
 import 
org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._
-import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema}
+import 
org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, 
PrefixKeyScanStateEncoderSpec, RangeKeyScanStateEncoderSpec, 
StateStoreColFamilySchema}
+import org.apache.spark.sql.streaming.TimeMode
+import org.apache.spark.sql.types.{NullType, StructField, StructType}
 
 object StateStoreColumnFamilySchemaUtils {
 
   def getValueStateSchema[T](
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       valEncoder: Encoder[T],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
-   StateStoreColFamilySchema(
+      hasTtl: Boolean): List[StateStoreColFamilySchema] = {
+   List(StateStoreColFamilySchema(
       stateName,
       keyEncoder.schema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
-      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))) ++
+        (if (hasTtl) {
+            val ttlKeyRowSchema = getSingleKeyTTLRowSchema(keyEncoder.schema)
+            List(
+              StateStoreColFamilySchema(
+                s"_ttl_$stateName",
+                ttlKeyRowSchema,
+                TTL_VALUE_ROW_SCHEMA,
+                Some(RangeKeyScanStateEncoderSpec(ttlKeyRowSchema, Seq(0)))))
+        } else {
+            Nil
+        })
   }
 
   def getListStateSchema[T](
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       valEncoder: Encoder[T],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
-  StateStoreColFamilySchema(
-      stateName,
-      keyEncoder.schema,
-      getValueSchemaWithTTL(valEncoder.schema, hasTtl),
-      Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+      hasTtl: Boolean): List[StateStoreColFamilySchema] = {
+    List(
+      StateStoreColFamilySchema(
+        stateName,
+        keyEncoder.schema,
+        getValueSchemaWithTTL(valEncoder.schema, hasTtl),
+        Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))
+    ) ++ (if (hasTtl) {
+      val ttlKeyRowSchema = getSingleKeyTTLRowSchema(keyEncoder.schema)
+      List(
+        StateStoreColFamilySchema(
+          s"_ttl_$stateName",
+          ttlKeyRowSchema,
+          TTL_VALUE_ROW_SCHEMA,
+          Some(RangeKeyScanStateEncoderSpec(ttlKeyRowSchema, Seq(0)))))
+    } else {
+      Nil
+    })
   }
 
   def getMapStateSchema[K, V](
       stateName: String,
       keyEncoder: ExpressionEncoder[Any],
       userKeyEnc: Encoder[K],
       valEncoder: Encoder[V],
-      hasTtl: Boolean): StateStoreColFamilySchema = {
+      hasTtl: Boolean): List[StateStoreColFamilySchema] = {
     val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, 
userKeyEnc.schema)
-    StateStoreColFamilySchema(
+    List(StateStoreColFamilySchema(
       stateName,
       compositeKeySchema,
       getValueSchemaWithTTL(valEncoder.schema, hasTtl),
       Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)),
-      Some(userKeyEnc.schema))
+      Some(userKeyEnc.schema))) ++ (if (hasTtl) {
+      val ttlKeyRowSchema = getCompositeKeyTTLRowSchema(
+        keyEncoder.schema, userKeyEnc.schema)
+      List(
+        StateStoreColFamilySchema(
+          s"_ttl_$stateName",
+          ttlKeyRowSchema,
+          TTL_VALUE_ROW_SCHEMA,
+          Some(RangeKeyScanStateEncoderSpec(ttlKeyRowSchema, Seq(0)))))
+    } else {
+      Nil
+    })
+  }
+
+  def getTimerSchema(
+      keyEncoder: ExpressionEncoder[Any],
+      timeMode: TimeMode): List[StateStoreColFamilySchema] = {
+    val timerCFName = if (timeMode == TimeMode.ProcessingTime) {
+      TimerStateUtils.PROC_TIMERS_STATE_NAME
+    } else {
+      TimerStateUtils.EVENT_TIMERS_STATE_NAME
+    }
+    val rowEncoder = new TimerKeyEncoder(keyEncoder)
+    val schemaForKeyRow = rowEncoder.schemaForKeyRow
+    val schemaForValueRow = StructType(Array(StructField("__dummy__", 
NullType)))

Review Comment:
   Why not doing the same with TTL_VALUE_ROW_SCHEMA, having companion object in 
TimerStateImpl and define VALUE_ROW_SCHEMA?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala:
##########
@@ -144,8 +147,16 @@ class StatefulProcessorHandleSuite extends 
StateVariableSuiteBase {
     test(s"registering timeouts with timeMode=$timeMode should succeed") {
       tryWithProviderResource(newStoreProviderWithStateVariable(true)) { 
provider =>
         val store = provider.getStore(0)
+        val timerCFName = if (timeMode == "ProcessingTime") {

Review Comment:
   Maybe not a private method but utility object, as I'm seeing the same code 
in other suite as well.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -188,7 +192,8 @@ class ValueStateSuite extends StateVariableSuiteBase {
     val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] {
       provider.init(
         storeId, keySchema, valueSchema, 
NoPrefixKeyStateEncoderSpec(keySchema),
-        useColumnFamilies = true, storeConf, new Configuration)
+        useColumnFamilies = true,

Review Comment:
   nit: unnecessary change?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -129,6 +129,7 @@ trait StateStore extends ReadStateStore {
    */
   def createColFamilyIfAbsent(
       colFamilyName: String,
+      columnFamilyId: Short,

Review Comment:
   This might be tricky on the migration scenario, but allowing overwrite to 
the CF ID for the same CF name does not help on migration anyway. We will only 
see one CF ID for one CF name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to