Benjamin Bargeton created KAFKA-5398: ----------------------------------------
Summary: Joins on GlobalKTable don't work properly when combined with Avro and the Confluent Schema Registry Key: KAFKA-5398 URL: https://issues.apache.org/jira/browse/KAFKA-5398 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 0.10.2.1, 0.10.2.0 Environment: Kafka, Avro, Confluent Schema Registry (3.2.1) Reporter: Benjamin Bargeton Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected when using the following setup: * Use Kafka in combination with the Confluent Schema Registry * Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} input by posting some messages with an Avro {{GenericRecord}} as the key (using a traditional {{Producer/ProducerRecord}} for example). The dumb avro schema for the exemple: {code:javascript} { "type": "record", "name": "AvroKey", "namespace": "com.test.key", "fields": [ { "name": "anyfield", "type": "string" } ] } {code} * Start a kafka stream process that process messages using this time an Avro {{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same schema {code:java} KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic"); GlobalKTable<AvroKey, AnyObject> globalTable = builder.globalTable("my-global-topic", "my-global-topic-store"); stream .leftJoin(globalTable, (k, v) -> new AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/) .print("Result"); {code} Note that the schema generated by Avro for the {{SpecificRecord}} slightly differs from the original one because we use String instead of CharSequence (Avro config): {code:javascript} { "type": "record", "name": "AvroKey", "namespace": "com.test.key", "fields": [ { "name": "anyfield", "type": { "type": "string", "avro.java.string": "String" } } ] } {code} * Last but not least, the Confluent Schema Registry will use byte 1-4 of the Avro serialized object to put the schema id of the schema stored in the schema registry. http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will be initilized, it will use the {{byte[]}} straight from the key. https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179 https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164 Schemas for producer and stream app differs slightly (but are compatible), so they are registred with a different global id. Since the id is contained in the binary representation, the lookup will fail during the join. I didn't test but the issue is probably broader than just this case: if the we have an upstream producer that is doing a schema evolution (with backwards compatible change), it should lead to the same issue. Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works fine, because the key is first deserialized and then reserialized using the current serdes: https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197 https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198 To conclude I'm not sure to fully understand yet how all pieces connect together for state stores, but I assume that for a {{GlobalKTable}} there should also be a derserialization/reserialization for each key before storing them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior coherent). -- This message was sent by Atlassian JIRA (v6.3.15#6346)