HeartSaVioR commented on code in PR #47524: URL: https://github.com/apache/spark/pull/47524#discussion_r1710830978
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -166,21 +170,43 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { + stateSchemaVersion: Int): (Boolean, List[StateStoreColFamilySchema]) = { val existingStateSchemaList = getExistingKeyAndValueSchema().sortBy(_.colFamilyName) - val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName) + // assign colFamilyIds based on position in list + var maxId: Short = existingStateSchemaList.map(_.colFamilyId).maxOption.getOrElse(0) if (existingStateSchemaList.isEmpty) { // write the schema file if it doesn't exist + val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName).zipWithIndex.map { + case (schema, index) => + schema.copy(colFamilyId = (index + 1).toShort) + } createSchemaFile(newStateSchemaList, stateSchemaVersion) - true + (true, newStateSchemaList) } else { - // validate if the new schema is compatible with the existing schema - existingStateSchemaList.lazyZip(newStateSchemaList).foreach { - case (existingStateSchema, newStateSchema) => - check(existingStateSchema, newStateSchema, ignoreValueSchema) + // create a hashmap of name to schema from existing state schema list + val existingSchemas = existingStateSchemaList.map { schema => + schema.colFamilyName -> schema + }.toMap + // For each column family we are creating, check if it existed in the + // previous run, and assign its ID to the old ID if the schema hasn't changed. + // If the schema has evolved, assign it a new ID. Review Comment: I don't quite understand about this, but I might be missing. 1. Here we use 'check()', which is actually checking the "compatibility" of schema, not to handle schema evolution. (We allow the change of name and nullability for column in state.) I guess the spec of schema evolution we are demanding for transformWithState is more flexible than what we do in check(). 2. If we assign to a new ID, wouldn't we lose the existing data, say, dataloss? Where we migrate the existing values for old ID to new ID? If there is one, shall we defer the change to the time when we truly support schema evolution? We even return `false` to indicate caller that there was no schema evolution and what we are doing to quite different. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -129,6 +129,7 @@ trait StateStore extends ReadStateStore { */ def createColFamilyIfAbsent( colFamilyName: String, + columnFamilyId: Short, Review Comment: I guess it's not happening now (if you made a migration logic then it could be happening), but what if we are re-assigning CF ID for the same CF name due to schema change? Will we perform `removeColFamilyIfExists` and then `createColFamilyIfAbsent`? Isn't it dangerous now to allow calling the same CF name with different CF ID without removing existing CF? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -166,21 +170,43 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( newStateSchema: List[StateStoreColFamilySchema], ignoreValueSchema: Boolean, - stateSchemaVersion: Int): Boolean = { + stateSchemaVersion: Int): (Boolean, List[StateStoreColFamilySchema]) = { val existingStateSchemaList = getExistingKeyAndValueSchema().sortBy(_.colFamilyName) - val newStateSchemaList = newStateSchema.sortBy(_.colFamilyName) + // assign colFamilyIds based on position in list Review Comment: This code comment is misleading, it is only applied when existingStateSchemaList is empty, right? please move this to proper place. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala: ########## @@ -234,7 +253,8 @@ class MapStateSuite extends StateVariableSuiteBase { val batchTimestampMs = 10 val handle = new StatefulProcessorHandleImpl(store, UUID.randomUUID(), Encoders.STRING.asInstanceOf[ExpressionEncoder[Any]], - TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs)) + TimeMode.ProcessingTime(), batchTimestampMs = Some(batchTimestampMs), + columnFamilyIds = Map("testState" -> 1.toShort)) Review Comment: missing `_ttl_testState`? doesn't seem to matter for the test result though. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ########## @@ -86,7 +86,8 @@ class StatefulProcessorHandleImpl( timeMode: TimeMode, isStreaming: Boolean = true, batchTimestampMs: Option[Long] = None, - metrics: Map[String, SQLMetric] = Map.empty) + metrics: Map[String, SQLMetric] = Map.empty, + columnFamilyIds: Map[String, Short] = Map.empty) Review Comment: nit: maybe add `@param columnFamilyIds` into class doc and fill out explanation? Probably odd to only miss this. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -144,8 +147,16 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { test(s"registering timeouts with timeMode=$timeMode should succeed") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) + val timerCFName = if (timeMode == "ProcessingTime") { Review Comment: Why not extracting this to the private method providing two CF names for given timeMode? Looks like we are introducing non-trivial number of duplicated code. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -129,6 +129,7 @@ trait StateStore extends ReadStateStore { */ def createColFamilyIfAbsent( colFamilyName: String, + columnFamilyId: Short, Review Comment: Also please update the method doc for the semantic of the behavior about CF ID. It was obvious and it is no longer obvious. (It's still following the existing behavior, but confusing and dangerous as CF ID is the "real" key.) ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -58,16 +58,19 @@ private[sql] class RocksDBStateStoreProvider override def createColFamilyIfAbsent( colFamilyName: String, + colFamilyId: Short, keySchema: StructType, valueSchema: StructType, keyStateEncoderSpec: KeyStateEncoderSpec, useMultipleValuesPerKey: Boolean = false, isInternal: Boolean = false): Unit = { - val newColFamilyId = ColumnFamilyUtils.createColFamilyIfAbsent(colFamilyName, isInternal) - + ColumnFamilyUtils. + verifyColFamilyCreationOrDeletion("create_col_family", colFamilyName, isInternal) + colFamilyNameToIdMap.put(colFamilyName, colFamilyId) Review Comment: Likewise I commented in the StateStore instance, we need to revisit the semantic. If we blindly allow this overwrite, we will lose access to the existing data (say, dangling pointer) meaning dataloss. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ########## @@ -242,7 +243,20 @@ class IncrementalExecution( metadataWriter.write(metadata) case _ => } - statefulOp + statefulOp match { + case tws: TransformWithStateExec => Review Comment: Doesn't look like we need to scope this to TransformWithStateExec as we go through the same code path for every stateful operator. I know it's only effective for TransformWithStateExec, but just to have less code branch. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ########## @@ -703,6 +706,24 @@ object StreamExecution { "py4j.protocol.Py4JJavaError: An error occurred while calling" + s"((.|\\r\\n|\\r|\\n)*)(${IO_EXCEPTION_NAMES.mkString("|")})").r + private val columnFamilySchemas: Review Comment: Is there a reason to require a single cache for this? StreamExecution instance is created per query, so once we persist this into StreamExecution and make the IncrementalExecution to update back, there does not seem to be a reason to have "global" cache for this. No need to consider cleaning up. ########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala: ########## @@ -902,7 +902,8 @@ class TransformWithStateSuite extends StateStoreMetricsTest keySchema, new StructType().add("value", LongType, false), Some(NoPrefixKeyStateEncoderSpec(keySchema)), - None + None, + 1.toShort Review Comment: We predicted the future? :) The state names are alphabetically sorted. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ########## @@ -667,12 +678,35 @@ object TransformWithStateExec { initialStateDeserializer: Expression, initialState: SparkPlan): SparkPlan = { val shufflePartitions = child.session.sessionState.conf.numShufflePartitions + + // We are assigning the columnFamilyIds based on their alphabetic order + // in the batch world since there is no notion of restarts, and we don't need + // the columnFamilyIds here to be consistent between runs + val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(timeMode, keyEncoder) + driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT) + statefulProcessor.setHandle(driverProcessorHandle) + statefulProcessor.init(outputMode, timeMode) + if (timeMode != NoTime) { + driverProcessorHandle.registerTimer(0L) + } + val columnFamilySchemas = driverProcessorHandle.getColumnFamilySchemas + // assign columnFamilyIds based on alphabetic name within map + val columnFamilySchemasWithIds = columnFamilySchemas.toList + .sortBy(_._2.colFamilyName) + .zipWithIndex + .map { case ((name, schema), index) => + name -> schema.copy(colFamilyId = index.toShort) Review Comment: I guess we don't reserve the ID 0 in streaming side. Should this be applied to batch side as well? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## Review Comment: Once we make clear about the semantic of different CF IDs for the same CF name, I'd like to see the new test verifying the behavior. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala: ########## @@ -166,21 +170,43 @@ class StateSchemaCompatibilityChecker( def validateAndMaybeEvolveStateSchema( Review Comment: So we need to think about the semantic of return value - we don't seem to have a case of schema evolution for each state, but more or less states are allowed. Should we call this as evolved or not (given it's not "schema" evolution)? Let's clarify the semantic of "evolution" in the method doc so that there is no confusion. ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ########## @@ -18,47 +18,112 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.execution.streaming.StateTTLSchema.TTL_VALUE_ROW_SCHEMA import org.apache.spark.sql.execution.streaming.TransformWithStateKeyValueRowSchemaUtils._ -import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, StateStoreColFamilySchema} +import org.apache.spark.sql.execution.streaming.state.{NoPrefixKeyStateEncoderSpec, PrefixKeyScanStateEncoderSpec, RangeKeyScanStateEncoderSpec, StateStoreColFamilySchema} +import org.apache.spark.sql.streaming.TimeMode +import org.apache.spark.sql.types.{NullType, StructField, StructType} object StateStoreColumnFamilySchemaUtils { def getValueStateSchema[T]( stateName: String, keyEncoder: ExpressionEncoder[Any], valEncoder: Encoder[T], - hasTtl: Boolean): StateStoreColFamilySchema = { - StateStoreColFamilySchema( + hasTtl: Boolean): List[StateStoreColFamilySchema] = { + List(StateStoreColFamilySchema( stateName, keyEncoder.schema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), - Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema)))) ++ + (if (hasTtl) { + val ttlKeyRowSchema = getSingleKeyTTLRowSchema(keyEncoder.schema) + List( + StateStoreColFamilySchema( + s"_ttl_$stateName", + ttlKeyRowSchema, + TTL_VALUE_ROW_SCHEMA, + Some(RangeKeyScanStateEncoderSpec(ttlKeyRowSchema, Seq(0))))) + } else { + Nil + }) } def getListStateSchema[T]( stateName: String, keyEncoder: ExpressionEncoder[Any], valEncoder: Encoder[T], - hasTtl: Boolean): StateStoreColFamilySchema = { - StateStoreColFamilySchema( - stateName, - keyEncoder.schema, - getValueSchemaWithTTL(valEncoder.schema, hasTtl), - Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + hasTtl: Boolean): List[StateStoreColFamilySchema] = { + List( + StateStoreColFamilySchema( + stateName, + keyEncoder.schema, + getValueSchemaWithTTL(valEncoder.schema, hasTtl), + Some(NoPrefixKeyStateEncoderSpec(keyEncoder.schema))) + ) ++ (if (hasTtl) { + val ttlKeyRowSchema = getSingleKeyTTLRowSchema(keyEncoder.schema) + List( + StateStoreColFamilySchema( + s"_ttl_$stateName", + ttlKeyRowSchema, + TTL_VALUE_ROW_SCHEMA, + Some(RangeKeyScanStateEncoderSpec(ttlKeyRowSchema, Seq(0))))) + } else { + Nil + }) } def getMapStateSchema[K, V]( stateName: String, keyEncoder: ExpressionEncoder[Any], userKeyEnc: Encoder[K], valEncoder: Encoder[V], - hasTtl: Boolean): StateStoreColFamilySchema = { + hasTtl: Boolean): List[StateStoreColFamilySchema] = { val compositeKeySchema = getCompositeKeySchema(keyEncoder.schema, userKeyEnc.schema) - StateStoreColFamilySchema( + List(StateStoreColFamilySchema( stateName, compositeKeySchema, getValueSchemaWithTTL(valEncoder.schema, hasTtl), Some(PrefixKeyScanStateEncoderSpec(compositeKeySchema, 1)), - Some(userKeyEnc.schema)) + Some(userKeyEnc.schema))) ++ (if (hasTtl) { + val ttlKeyRowSchema = getCompositeKeyTTLRowSchema( + keyEncoder.schema, userKeyEnc.schema) + List( + StateStoreColFamilySchema( + s"_ttl_$stateName", + ttlKeyRowSchema, + TTL_VALUE_ROW_SCHEMA, + Some(RangeKeyScanStateEncoderSpec(ttlKeyRowSchema, Seq(0))))) + } else { + Nil + }) + } + + def getTimerSchema( + keyEncoder: ExpressionEncoder[Any], + timeMode: TimeMode): List[StateStoreColFamilySchema] = { + val timerCFName = if (timeMode == TimeMode.ProcessingTime) { + TimerStateUtils.PROC_TIMERS_STATE_NAME + } else { + TimerStateUtils.EVENT_TIMERS_STATE_NAME + } + val rowEncoder = new TimerKeyEncoder(keyEncoder) + val schemaForKeyRow = rowEncoder.schemaForKeyRow + val schemaForValueRow = StructType(Array(StructField("__dummy__", NullType))) Review Comment: Why not doing the same with TTL_VALUE_ROW_SCHEMA, having companion object in TimerStateImpl and define VALUE_ROW_SCHEMA? ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StatefulProcessorHandleSuite.scala: ########## @@ -144,8 +147,16 @@ class StatefulProcessorHandleSuite extends StateVariableSuiteBase { test(s"registering timeouts with timeMode=$timeMode should succeed") { tryWithProviderResource(newStoreProviderWithStateVariable(true)) { provider => val store = provider.getStore(0) + val timerCFName = if (timeMode == "ProcessingTime") { Review Comment: Maybe not a private method but utility object, as I'm seeing the same code in other suite as well. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala: ########## @@ -188,7 +192,8 @@ class ValueStateSuite extends StateVariableSuiteBase { val ex = intercept[StateStoreMultipleColumnFamiliesNotSupportedException] { provider.init( storeId, keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), - useColumnFamilies = true, storeConf, new Configuration) + useColumnFamilies = true, Review Comment: nit: unnecessary change? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ########## @@ -129,6 +129,7 @@ trait StateStore extends ReadStateStore { */ def createColFamilyIfAbsent( colFamilyName: String, + columnFamilyId: Short, Review Comment: This might be tricky on the migration scenario, but allowing overwrite to the CF ID for the same CF name does not help on migration anyway. We will only see one CF ID for one CF name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org