Frederic Tardif created KAFKA-7662:
--------------------------------------
Summary: Avro schema upgrade not supported on globalTable
Key: KAFKA-7662
URL: https://issues.apache.org/jira/browse/KAFKA-7662
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 1.0.0
Reporter: Frederic Tardif
I did quite a bit of testing around the avro upgrades, and it did not behave as
I would have expected when the avro is used as a Key for a global table with a
rocksDB store
setup:
* local confluent suite 4.0.2
* test with stream app and producer (v 1.0.0)
* schemas (key) :
schemas :
{code:java}
schema version @1
{
"namespace": "com.bell.cts.livecms.livemedia.topic",
"type" : "record",
"name" : "EventKey",
"fields" : [
{"name" : "keyAttribute1", "type" : "string"}
]
}
schema version @2
{
"namespace": "com.bell.cts.livecms.livemedia.topic",
"type" : "record",
"name" : "EventKey",
"fields" : [
{"name" : "keyAttribute1", "type" : "string"},
{"name" : "keyAttribute2", "type" : ["null", "string"], "default" : null}
]
}{code}
* TEST1 (PASS)
** using schema version @1
** produce record1=[k@1, v@1]
** stream apps loads record1 in global table and store locally in rocksdb
** asyncAssert that store.get(k@1)=v@1 : PASS
* TEST2 (PASS)
** using schema version @1
** delete local store (and checkpoint)
** stream apps loads record1 in global table and store locally in rocksdb
** asyncAssert that store.get(k@1)=v@1 : PASS
* TEST3 (FAIL)
** using schema version @2
** keep local store
** stream apps does not reload record1 from topic because of local offset
** asyncAssert that store.get(k@1)=v@1 : FAIL
** however store.all().next().key.equals(k@2) , as built using schema version 2
** this would be explained by the fact that the rocksdb store has some magic
byte persisted of the record based on schema version 1
** Not ideal, but I could consider accceptable to delete the local store in
this cases.
* TEST4 (FAIL)
** using schema version @2
** delete local store (and checkpoint)
** stream apps loads record1 (produced from schema @1) in global table and
store locally in rocksdb
** asyncAssert that store.get(k@2)=v@2 : FAIL
** however store.all().next().key.equals(k@2) , as built using schema version 2
** I can't quite understand this one. I would have expected that the rockdb
store should now be provisioned with a serialized version of the record based
on the schema v2 (as it went though the stream app underpinning the store
materialization)
* TEST5 (FAIL)
** using schema version @2
** produce record2=[k@2, v@2] (meant to be backward compatible and logically
equals to record1)
** stream apps does the processing of record1(produced from schema @1) and
record2 (produced from schema @2) and materialize the global table stored
locally in rocksdb
** asyncAssert that store.get(k@2)=v@2 : PASS but the store now has 2 entries
!!!
** it looks as if the stream.groupBy(key) of the topic underpinning the
globaltable materialization did not group the 2 record keys together, although
record1.key.equals(record2.key) is true in Java (by looping in the store)
reading from the upstream raw topic throughout the testing :
{code:java}
/tmp$ kafka-avro-console-consumer --topic topic-test-5 --bootstrap-server
localhost:9092 --property schema.registry.url=http://127.0.0.1:8081 --property
print.key=true --from-beginning
{"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"}
{"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"}
{"keyAttribute1":"key-attribute-1"} {"valueAttribute1":"value-1"}
{"keyAttribute1":"key-attribute-1","keyAttribute2":null}
{"valueAttribute1":"value-1"}{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)