[ 
https://issues.apache.org/jira/browse/KAFKA-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049190#comment-16049190
 ] 

Benjamin Bargeton commented on KAFKA-5398:
------------------------------------------

Actually I continued to dig into Kafka streams, and I'm having the same kind of 
issues with operations that triggers an internal re-partitioning.
For example when joining a KStream with a KTable (after a selectKey on the 
stream), if the source topic of the KTable was written with a key that is 
equals to the key selected for the KStream, but with a different serialization 
(in my case a different schema id), the partition distribution will not match, 
and thus the join will not work properly.

Am I missing something or is it a real issue?


> Joins on GlobalKTable don't work properly when combined with Avro and the 
> Confluent Schema Registry
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>         Environment: Kafka, Avro, Confluent Schema Registry (3.2.1)
>            Reporter: Benjamin Bargeton
>
> Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected 
> when using the following setup:
> * Use Kafka in combination with the Confluent Schema Registry
> * Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} 
> input by posting some messages with an Avro {{GenericRecord}} as the key 
> (using a traditional {{Producer/ProducerRecord}} for example).
> The dumb avro schema for the exemple:
> {code:javascript}
> {
>   "type": "record",
>   "name": "AvroKey",
>   "namespace": "com.test.key",
>   "fields": [
>     {
>       "name": "anyfield",
>       "type": "string"
>     }
>   ]
> }
> {code}
> * Start a kafka stream process that process messages using this time an Avro 
> {{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same 
> schema
> {code:java}
> KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic");
> GlobalKTable<AvroKey, AnyObject> globalTable = 
> builder.globalTable("my-global-topic", "my-global-topic-store");
> stream
>       .leftJoin(globalTable, (k, v) -> new 
> AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/)
>       .print("Result");
> {code}
> Note that the schema generated by Avro for the {{SpecificRecord}} slightly 
> differs from the original one because we use String instead of CharSequence 
> (Avro config):
> {code:javascript}
> {
>   "type": "record",
>   "name": "AvroKey",
>   "namespace": "com.test.key",
>   "fields": [
>     {
>       "name": "anyfield",
>       "type": {
>         "type": "string",
>         "avro.java.string": "String"
>       }
>     }
>   ]
> }
> {code}
> * Last but not least, the Confluent Schema Registry will use byte 1-4 of the 
> Avro serialized object to put the schema id of the schema stored in the 
> schema registry.
> http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
> Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will 
> be initilized, it will use the {{byte[]}} straight from the key.
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164
> Schemas for producer and stream app differs slightly (but are compatible), so 
> they are registred with a different global id.
> Since the id is contained in the binary representation, the lookup will fail 
> during the join.
> I didn't test but the issue is probably broader than just this case: if the 
> we have an upstream producer that is doing a schema evolution (with backwards 
> compatible change), it should lead to the same issue.
> Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works 
> fine, because the key is first deserialized and then reserialized using the 
> current serdes:
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198
> To conclude I'm not sure to fully understand yet how all pieces connect 
> together for state stores, but I assume that for a {{GlobalKTable}} there 
> should also be a derserialization/reserialization for each key before storing 
> them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior 
> coherent).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to