[ https://issues.apache.org/jira/browse/KAFKA-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049190#comment-16049190 ]
Benjamin Bargeton commented on KAFKA-5398: ------------------------------------------ Actually I continued to dig into Kafka streams, and I'm having the same kind of issues with operations that triggers an internal re-partitioning. For example when joining a KStream with a KTable (after a selectKey on the stream), if the source topic of the KTable was written with a key that is equals to the key selected for the KStream, but with a different serialization (in my case a different schema id), the partition distribution will not match, and thus the join will not work properly. Am I missing something or is it a real issue? > Joins on GlobalKTable don't work properly when combined with Avro and the > Confluent Schema Registry > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-5398 > URL: https://issues.apache.org/jira/browse/KAFKA-5398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Kafka, Avro, Confluent Schema Registry (3.2.1) > Reporter: Benjamin Bargeton > > Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected > when using the following setup: > * Use Kafka in combination with the Confluent Schema Registry > * Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} > input by posting some messages with an Avro {{GenericRecord}} as the key > (using a traditional {{Producer/ProducerRecord}} for example). > The dumb avro schema for the exemple: > {code:javascript} > { > "type": "record", > "name": "AvroKey", > "namespace": "com.test.key", > "fields": [ > { > "name": "anyfield", > "type": "string" > } > ] > } > {code} > * Start a kafka stream process that process messages using this time an Avro > {{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same > schema > {code:java} > KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic"); > GlobalKTable<AvroKey, AnyObject> globalTable = > builder.globalTable("my-global-topic", "my-global-topic-store"); > stream > .leftJoin(globalTable, (k, v) -> new > AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/) > .print("Result"); > {code} > Note that the schema generated by Avro for the {{SpecificRecord}} slightly > differs from the original one because we use String instead of CharSequence > (Avro config): > {code:javascript} > { > "type": "record", > "name": "AvroKey", > "namespace": "com.test.key", > "fields": [ > { > "name": "anyfield", > "type": { > "type": "string", > "avro.java.string": "String" > } > } > ] > } > {code} > * Last but not least, the Confluent Schema Registry will use byte 1-4 of the > Avro serialized object to put the schema id of the schema stored in the > schema registry. > http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format > Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will > be initilized, it will use the {{byte[]}} straight from the key. > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179 > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164 > Schemas for producer and stream app differs slightly (but are compatible), so > they are registred with a different global id. > Since the id is contained in the binary representation, the lookup will fail > during the join. > I didn't test but the issue is probably broader than just this case: if the > we have an upstream producer that is doing a schema evolution (with backwards > compatible change), it should lead to the same issue. > Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works > fine, because the key is first deserialized and then reserialized using the > current serdes: > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197 > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198 > To conclude I'm not sure to fully understand yet how all pieces connect > together for state stores, but I assume that for a {{GlobalKTable}} there > should also be a derserialization/reserialization for each key before storing > them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior > coherent). -- This message was sent by Atlassian JIRA (v6.4.14#64029)