anishshri-db commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1902071479
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala: ########## @@ -234,7 +235,8 @@ trait TransformWithStateMetadataUtils extends Logging { newSchemas.values.toList, session.sessionState, stateSchemaVersion, storeName = StateStoreId.DEFAULT_STORE_NAME, oldSchemaFilePath = oldStateSchemaFilePath, - newSchemaFilePath = Some(newStateSchemaFilePath))) + newSchemaFilePath = Some(newStateSchemaFilePath), + schemaEvolutionEnabled = stateStoreEncodingFormat == "avro")) Review Comment: Do we always convert `stateStoreEncodingFormat` to lowercase before the comparison here ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -523,6 +547,14 @@ private[sql] class RocksDBStateStoreProvider if (!condition) { throw new IllegalStateException(msg) } } + private[sql] def getSchemaProvider: TestStateSchemaProvider = { Review Comment: Can't this be within the test code ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -680,22 +712,37 @@ object RocksDBStateStoreProvider { stateStoreEncoding: String, encoderCacheKey: StateRowEncoderCacheKey, keyStateEncoderSpec: KeyStateEncoderSpec, - valueSchema: StructType): RocksDBDataEncoder = { + valueSchema: StructType, + stateSchemaProvider: Option[StateSchemaProvider], + columnFamilyInfo: Option[ColumnFamilyInfo] = None): RocksDBDataEncoder = { assert(Set("avro", "unsaferow").contains(stateStoreEncoding)) RocksDBStateStoreProvider.dataEncoderCache.get( encoderCacheKey, new java.util.concurrent.Callable[RocksDBDataEncoder] { override def call(): RocksDBDataEncoder = { if (stateStoreEncoding == "avro") { - new AvroStateEncoder(keyStateEncoderSpec, valueSchema, Some(DEFAULT_SCHEMA_IDS)) + new AvroStateEncoder( + keyStateEncoderSpec, + valueSchema, + stateSchemaProvider, + columnFamilyInfo + ) } else { - new UnsafeRowDataEncoder(keyStateEncoderSpec, valueSchema, None) + new UnsafeRowDataEncoder( + keyStateEncoderSpec, + valueSchema, + stateSchemaProvider, + columnFamilyInfo + ) } } } ) } + private[sql] def clearDataEncoderCache: Unit = Review Comment: Where is this used ? ########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala: ########## @@ -82,11 +82,23 @@ private[sql] class RocksDBStateStoreProvider stateStoreName = stateStoreId.storeName, colFamilyName = colFamilyName) - val dataEncoder = getDataEncoder( - stateStoreEncoding, dataEncoderCacheKey, keyStateEncoderSpec, valueSchema) - val columnFamilyInfo = Some(ColumnFamilyInfo(colFamilyName, newColFamilyId)) + stateSchemaProvider match { + case Some(t: TestStateSchemaProvider) => Review Comment: intentional ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org