[ https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Balaji Rao updated KAFKA-14070: ------------------------------- Description: When using key-value state stores with Processor API, one can add key-value state stores of arbitrary key types to a topology. This could lead to the method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect expectations. In my understanding, `queryMetadataForKey` uses the source topics of the processor connected to the store to return the `KeyQueryMetadata`. This means that it could provide "incorrect" answers when used with key-value stores of arbitrary key types. The description of the method should be improved to make users aware of this pitfall. Edit: Example scala code {code:scala} val input = streamsBuilder.stream( "input-topic", Consumed.`with`(Serdes.intSerde, Serdes.stringSerde) ) private val storeBuilder = Stores .keyValueStoreBuilder[String, String]( Stores.inMemoryKeyValueStore("store"), Serdes.stringSerde, Serdes.stringSerde ) streamsBuilder.addStateStore(storeBuilder) input.process( new ProcessorSupplier[Int, String, Void, Void] { override def get(): Processor[Int, String, Void, Void] = new Processor[Int, String, Void, Void] { var store: KeyValueStore[String, String] = _ override def init(context: ProcessorContext[Void, Void]): Unit = { super.init(context) store = context.getStateStore("store") } override def process(record: Record[Int, String]): Unit = { ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}")) } } }, "store" ){code} In the code sample above, the usage of `queryMetadataForKey` on `store` to find the was: When using key-value state stores with Processor API, one can add key-value state stores of arbitrary key types to a topology. This could lead to the method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect expectations. In my understanding, `queryMetadataForKey` uses the source topics of the processor connected to the store to return the `KeyQueryMetadata`. This means that it could provide "incorrect" answers when used with key-value stores of arbitrary key types. The description of the method should be improved to make users aware of this pitfall. > Improve documentation for queryMetadataForKey > --------------------------------------------- > > Key: KAFKA-14070 > URL: https://issues.apache.org/jira/browse/KAFKA-14070 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 3.2.0 > Reporter: Balaji Rao > Priority: Minor > > When using key-value state stores with Processor API, one can add key-value > state stores of arbitrary key types to a topology. This could lead to the > method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect > expectations. > In my understanding, `queryMetadataForKey` uses the source topics of the > processor connected to the store to return the `KeyQueryMetadata`. This means > that it could provide "incorrect" answers when used with key-value stores of > arbitrary key types. The description of the method should be improved to make > users aware of this pitfall. > Edit: Example scala code > {code:scala} > val input = streamsBuilder.stream( > "input-topic", > Consumed.`with`(Serdes.intSerde, Serdes.stringSerde) > ) > private val storeBuilder = Stores > .keyValueStoreBuilder[String, String]( > Stores.inMemoryKeyValueStore("store"), > Serdes.stringSerde, > Serdes.stringSerde > ) > streamsBuilder.addStateStore(storeBuilder) > input.process( > new ProcessorSupplier[Int, String, Void, Void] { > override def get(): Processor[Int, String, Void, Void] = > new Processor[Int, String, Void, Void] { > var store: KeyValueStore[String, String] = _ > override def init(context: ProcessorContext[Void, Void]): Unit = { > super.init(context) > store = context.getStateStore("store") > } > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}")) > } > } > }, > "store" > ){code} > > In the code sample above, the usage of `queryMetadataForKey` on `store` to > find the -- This message was sent by Atlassian Jira (v8.20.10#820010)