[ 
https://issues.apache.org/jira/browse/KAFKA-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050225#comment-16050225
 ] 

Benjamin Bargeton edited comment on KAFKA-5398 at 6/15/17 11:39 AM:
--------------------------------------------------------------------

For sure, but the issue can still happen with the same serdes.
Let's say we have:
* a {{KStream}} backed by a topic ({{stream-topic}}) feeded by application {{A}}
* a {{KTable}} or a {{GlobalKTable}} backed by another topic ({{table-topic}}) 
feeded by application {{B}}
* {{table-topic}} was written by application {{B}} with an avro serialized key 
(schema {{v1}})
Now let's say application {{B}} update it's schema to {{v2}} (a new field with 
a default value for example)
{{table-topic}} now contains some {{v1}} serialized key and some {{v2}} 
serialized key

In our stream app, starting from the KSTream, we can choose to use either 
{{v1}} or {{v2}} to produce a key (let's say {{v2}}) (for {{selectKey}} 
operation prior to the {{join}} if we are using a {{KTable}}, or for the 
{{join}} directly if we are using a {{GlobalKTable}}).
* In the {{KTable}} case the join will fail because the re-partitioning will 
not be equivalent as the produced key is always in {{v2}} whereas 
{{table-topic}} contains both {{v1}} and {{v2}} 
* In the {{GlobalKTable}} case, it will fail too as described in the issue 
because the byte[] for the {{table-topic}} will be compared directly against 
the one of the produced key.

Regarding to the Confluent Schema Registry, the problem is amplified because it 
puts the schema id in the byte representation, so even adding a custom 
attribute to the schema (like {{avro.java.string}} that wont even be 
serialized) will change the produced bytes.

How would you manage those cases? 


was (Author: benba):
For sure, but the issue can still happen with the same serdes.
Let's say we have:
* a {{KStream}} backed by a topic ({{stream-topic}}) feeded by application {{A}}
* a {{KTable}} or a {{GlobalKTable}} backed by another topic ({{table-topic}}) 
feeded by application {{B}}
* {{table-topic}} was written by application {{B}} with an avro serialized key 
(schema {{v1}})
Now let's say application {{B}} update it's schema to {{v2}} (a new field with 
a default value for example)
{{table-topic}} now contains some {{v1}} serialized key and some {{v2}} 
serialized key

In our stream app, starting from the KSTream, we can choose to use either 
{{v1}} or {{v2}} to produce a key (let's say {{v2}}) (for {{selectKey}} 
operation prior to the {{join}} if we are using a {{KTable}}, or for the 
{{join}} directly if we are using a {{GlobalKTable}}).
* In the {{KTable}} case the join will fail because the re-partitioning will 
not be equivalent as the produced key is always in {{v2}} whereas 
{{table-topic}} contains both {{v1}} and {{v2}} 
* In the {{GlobalKTable}} case, it will fail too as described in the issue 
because the byte[] for the {{table-topic}} will be compared directly against 
the one of the produced key.

Regarding to the Confluent Schema Registry, the problem is amplified because it 
puts the schema id in the byte representation, so even adding a custom 
attribute to the schema (like {{avro.java.string}}) that wont even be 
serialized will change the produced bytes.

How would you manage those cases? 

> Joins on GlobalKTable don't work properly when combined with Avro and the 
> Confluent Schema Registry
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5398
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5398
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0, 0.10.2.1
>         Environment: Kafka, Avro, Confluent Schema Registry (3.2.1)
>            Reporter: Benjamin Bargeton
>
> Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected 
> when using the following setup:
> * Use Kafka in combination with the Confluent Schema Registry
> * Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} 
> input by posting some messages with an Avro {{GenericRecord}} as the key 
> (using a traditional {{Producer/ProducerRecord}} for example).
> The dumb avro schema for the exemple:
> {code:javascript}
> {
>   "type": "record",
>   "name": "AvroKey",
>   "namespace": "com.test.key",
>   "fields": [
>     {
>       "name": "anyfield",
>       "type": "string"
>     }
>   ]
> }
> {code}
> * Start a kafka stream process that process messages using this time an Avro 
> {{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same 
> schema
> {code:java}
> KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic");
> GlobalKTable<AvroKey, AnyObject> globalTable = 
> builder.globalTable("my-global-topic", "my-global-topic-store");
> stream
>       .leftJoin(globalTable, (k, v) -> new 
> AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/)
>       .print("Result");
> {code}
> Note that the schema generated by Avro for the {{SpecificRecord}} slightly 
> differs from the original one because we use String instead of CharSequence 
> (Avro config):
> {code:javascript}
> {
>   "type": "record",
>   "name": "AvroKey",
>   "namespace": "com.test.key",
>   "fields": [
>     {
>       "name": "anyfield",
>       "type": {
>         "type": "string",
>         "avro.java.string": "String"
>       }
>     }
>   ]
> }
> {code}
> * Last but not least, the Confluent Schema Registry will use byte 1-4 of the 
> Avro serialized object to put the schema id of the schema stored in the 
> schema registry.
> http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
> Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will 
> be initilized, it will use the {{byte[]}} straight from the key.
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164
> Schemas for producer and stream app differs slightly (but are compatible), so 
> they are registred with a different global id.
> Since the id is contained in the binary representation, the lookup will fail 
> during the join.
> I didn't test but the issue is probably broader than just this case: if the 
> we have an upstream producer that is doing a schema evolution (with backwards 
> compatible change), it should lead to the same issue.
> Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works 
> fine, because the key is first deserialized and then reserialized using the 
> current serdes:
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197
> https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198
> To conclude I'm not sure to fully understand yet how all pieces connect 
> together for state stores, but I assume that for a {{GlobalKTable}} there 
> should also be a derserialization/reserialization for each key before storing 
> them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior 
> coherent).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to