Benjamin Bargeton created KAFKA-5398:
----------------------------------------

             Summary: Joins on GlobalKTable don't work properly when combined 
with Avro and the Confluent Schema Registry
                 Key: KAFKA-5398
                 URL: https://issues.apache.org/jira/browse/KAFKA-5398
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 0.10.2.1, 0.10.2.0
         Environment: Kafka, Avro, Confluent Schema Registry (3.2.1)

            Reporter: Benjamin Bargeton


Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected 
when using the following setup:
* Use Kafka in combination with the Confluent Schema Registry
* Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} 
input by posting some messages with an Avro {{GenericRecord}} as the key (using 
a traditional {{Producer/ProducerRecord}} for example).
The dumb avro schema for the exemple:
{code:javascript}
{
  "type": "record",
  "name": "AvroKey",
  "namespace": "com.test.key",
  "fields": [
    {
      "name": "anyfield",
      "type": "string"
    }
  ]
}
{code}

* Start a kafka stream process that process messages using this time an Avro 
{{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same schema
{code:java}
KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic");
GlobalKTable<AvroKey, AnyObject> globalTable = 
builder.globalTable("my-global-topic", "my-global-topic-store");
stream
        .leftJoin(globalTable, (k, v) -> new 
AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/)
        .print("Result");
{code}
Note that the schema generated by Avro for the {{SpecificRecord}} slightly 
differs from the original one because we use String instead of CharSequence 
(Avro config):
{code:javascript}
{
  "type": "record",
  "name": "AvroKey",
  "namespace": "com.test.key",
  "fields": [
    {
      "name": "anyfield",
      "type": {
        "type": "string",
        "avro.java.string": "String"
      }
    }
  ]
}
{code}

* Last but not least, the Confluent Schema Registry will use byte 1-4 of the 
Avro serialized object to put the schema id of the schema stored in the schema 
registry.
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format

Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will be 
initilized, it will use the {{byte[]}} straight from the key.
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164

Schemas for producer and stream app differs slightly (but are compatible), so 
they are registred with a different global id.
Since the id is contained in the binary representation, the lookup will fail 
during the join.
I didn't test but the issue is probably broader than just this case: if the we 
have an upstream producer that is doing a schema evolution (with backwards 
compatible change), it should lead to the same issue.

Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works 
fine, because the key is first deserialized and then reserialized using the 
current serdes:
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197
https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198

To conclude I'm not sure to fully understand yet how all pieces connect 
together for state stores, but I assume that for a {{GlobalKTable}} there 
should also be a derserialization/reserialization for each key before storing 
them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior 
coherent).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to