ericm-db commented on code in PR #49277: URL: https://github.com/apache/spark/pull/49277#discussion_r1918067901
########## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithListStateSuite.scala: ########## @@ -328,4 +390,44 @@ class TransformWithListStateSuite extends StreamTest ) } } + + testWithEncoding("avro")("ListState schema evolution - add field") { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) { + withTempDir { dir => + val inputData = MemoryStream[String] + + // First run with initial schema + val result1 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new InitialListStateProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result1, OutputMode.Update())( + StartStream(checkpointLocation = dir.getCanonicalPath), + AddData(inputData, "a", "b"), + CheckNewAnswer(("a", "a"), ("b", "b")), + StopStream + ) + + // Second run with evolved schema + val result2 = inputData.toDS() + .groupByKey(x => x) + .transformWithState(new EvolvedListStateProcessor(), + TimeMode.None(), + OutputMode.Update()) + + testStream(result2, OutputMode.Update())( + StartStream(checkpointLocation = dir.getCanonicalPath), + AddData(inputData, "c"), + CheckNewAnswer(("c", "c")), + // Verify we can still read old state format Review Comment: Modified the test and the processors, should be better now. ########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala: ########## @@ -496,6 +498,486 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + test("AvroStateEncoder - add field") { + val keySchema = StructType(Seq( + StructField("k", StringType) + )) + + val initialValueSchema = StructType(Seq( + StructField("value", IntegerType, false) + )) + + val evolvedValueSchema = StructType(Seq( + StructField("value", IntegerType, false), + StructField("timestamp", LongType, true) + )) + + // Create test state schema provider + val testProvider = new TestStateSchemaProvider() + + // Add initial schema version + testProvider.captureSchema( + StateStore.DEFAULT_COL_FAMILY_NAME, + keySchema, + initialValueSchema, + valueSchemaId = 0 + ) + + // Create encoder with initial schema + val encoder1 = new AvroStateEncoder( + NoPrefixKeyStateEncoderSpec(keySchema), + initialValueSchema, + Some(testProvider), + Some(ColumnFamilyInfo(StateStore.DEFAULT_COL_FAMILY_NAME, 1)) + ) + + // Create test data + val proj = UnsafeProjection.create(initialValueSchema) + val row1 = proj.apply(InternalRow(1)) + + // Encode with schema v0 + val encoded = encoder1.encodeValue(row1) + + // Add evolved schema + testProvider.captureSchema( + StateStore.DEFAULT_COL_FAMILY_NAME, + keySchema, + evolvedValueSchema, + valueSchemaId = 1 + ) + + // Create encoder with initial schema + val encoder2 = new AvroStateEncoder( + NoPrefixKeyStateEncoderSpec(keySchema), + evolvedValueSchema, + Some(testProvider), + Some(ColumnFamilyInfo(StateStore.DEFAULT_COL_FAMILY_NAME, 1)) + ) + + // Decode with evolved schema + val decoded = encoder2.decodeValue(encoded) + + // Should be able to read old format + assert(decoded.getInt(0) === 1) + assert(decoded.getLong(1) === 0L) // New field should be null + + // Encode with new schema + val proj2 = UnsafeProjection.create(evolvedValueSchema) + val row2 = proj2.apply(InternalRow(2, 100L)) + val encoded2 = encoder2.encodeValue(row2) + + // Should write with new schema version + val decoded2 = encoder2.decodeValue(encoded2) + assert(decoded2.getInt(0) === 2) + assert(decoded2.getLong(1) === 100L) + } + + test("AvroStateEncoder - add field with null") { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org