Gordon Wang created KAFKA-10252: ----------------------------------- Summary: MeteredTimestampedKeyValueStore not setting up correct topic name for GlobalKTable associating StateStores Key: KAFKA-10252 URL: https://issues.apache.org/jira/browse/KAFKA-10252 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.4.1 Reporter: Gordon Wang
When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below: {code} GlobalKTable<GenericRecord, GenericRecord> globalTable = streamsBuilder.globalTable(topic, Consumed.with(keySerde, valueSerde), Materialized.as(Stores.inMemoryKeyValueStore(topic))); {code} I got StreamsException like below: {code} org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for topic applicationId-sourceTopicName-changelog Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401 {code} But as seen in GlobalKTable Java Doc the changelog stream shall not be created and in fact was not created.This leads to our custom serde to be searching for schema (we are using Confluent Platform and Avro based schema registry for the job) using a wrong topic name (should just be sourceTopicName rather than applicationId-sourceTopicName-changelog). After digging into the code, I found *initStoreSerde* method in *MeteredTimestampedKeyValueStore* would assume the topic backing the store would always be storeChangelogTopic when initializing the Serdes for the state store, I think for GlobalKTables (ones having a GlobalProcessorContextImpl ProcessorContext) we shall use the original topic name directly here. -- This message was sent by Atlassian Jira (v8.3.4#803005)