Gordon Wang created KAFKA-10252:
-----------------------------------

             Summary: MeteredTimestampedKeyValueStore not setting up correct 
topic name for GlobalKTable associating StateStores
                 Key: KAFKA-10252
                 URL: https://issues.apache.org/jira/browse/KAFKA-10252
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.4.1
            Reporter: Gordon Wang


When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
{code}
GlobalKTable<GenericRecord, GenericRecord> globalTable = 
streamsBuilder.globalTable(topic,
                Consumed.with(keySerde, valueSerde),
                Materialized.as(Stores.inMemoryKeyValueStore(topic)));
{code}
I got StreamsException like below:
{code}
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro 
schema for topic applicationId-sourceTopicName-changelog
Caused by: 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
Subject not found.; error code: 40401
{code}
But as seen in GlobalKTable Java Doc the changelog stream shall not be created 
and in fact was not created.This leads to our custom serde to be searching for 
schema (we are using Confluent Platform and Avro based schema registry for the 
job) using a wrong topic name (should just be sourceTopicName rather than 
applicationId-sourceTopicName-changelog).
After digging into the code, I found *initStoreSerde* method in 
*MeteredTimestampedKeyValueStore* would assume the topic backing the store 
would always be storeChangelogTopic when initializing the Serdes for the state 
store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl 
ProcessorContext) we shall use the original topic name directly here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to