Frederic Tardif created KAFKA-7662: -------------------------------------- Summary: Avro schema upgrade not supported on globalTable Key: KAFKA-7662 URL: https://issues.apache.org/jira/browse/KAFKA-7662 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.0.0 Reporter: Frederic Tardif
I did quite a bit of testing around the avro upgrades, and it did not behave as I would have expected when the avro is used as a Key for a global table with a rocksDB store setup: * local confluent suite 4.0.2 * test with stream app and producer (v 1.0.0) * schemas (key) : schemas : {code:java} schema version @1 { "namespace": "com.bell.cts.livecms.livemedia.topic", "type" : "record", "name" : "EventKey", "fields" : [ {"name" : "keyAttribute1", "type" : "string"} ] } schema version @2 { "namespace": "com.bell.cts.livecms.livemedia.topic", "type" : "record", "name" : "EventKey", "fields" : [ {"name" : "keyAttribute1", "type" : "string"}, {"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null} ] }{code} * TEST1 (PASS) ** using schema version @1 ** produce record1=[k@1, v@1] ** stream apps loads record1 in global table and store locally in rocksdb ** asyncAssert that store.get(k@1)=v@1 : PASS * TEST2 (PASS) ** using schema version @1 ** delete local store (and checkpoint) ** stream apps loads record1 in global table and store locally in rocksdb ** asyncAssert that store.get(k@1)=v@1 : PASS * TEST3 (FAIL) ** using schema version @2 ** keep local store ** stream apps does not reload record1 from topic because of local offset ** asyncAssert that store.get(k@1)=v@1 : FAIL ** however store.all().next().key.equals(k@2) , as built using schema version 2 ** this would be explained by the fact that the rocksdb store has some magic byte persisted of the record based on schema version 1 ** Not ideal, but I could consider accceptable to delete the local store in this cases. * TEST4 (FAIL) ** using schema version @2 ** delete local store (and checkpoint) ** stream apps loads record1 (produced from schema @1) in global table and store locally in rocksdb ** asyncAssert that store.get(k@2)=v@2 : FAIL ** however store.all().next().key.equals(k@2) , as built using schema version 2 ** I can't quite understand this one. I would have expected that the rockdb store should now be provisioned with a serialized version of the record based on the schema v2 (as it went though the stream app underpinning the store materialization) * TEST5 (FAIL) ** using schema version @2 ** produce record2=[k@2, v@2] (meant to be backward compatible and logically equals to record1) ** stream apps does the processing of record1(produced from schema @1) and record2 (produced from schema @2) and materialize the global table stored locally in rocksdb ** asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 entries !!! ** it looks as if the stream.groupBy(key) of the topic underpinning the globaltable materialization did not group the 2 record keys together, although record1.key.equals(record2.key) is true in Java (by looping in the store) reading from the upstream raw topic throughout the testing : {code:java} /tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 --property print.key=true --from-beginning {"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"} {"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"} {"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"} {"keyAttribute1":"key-attribute-1","keyAttribute2":null} {"valueAttribute1":"value-1"}{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)