[ https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Balaji Rao updated KAFKA-14070: ------------------------------- Summary: Improve documentation for queryMetadataForKey for state stores with Processor API (was: Improve documentation for queryMetadataForKey) > Improve documentation for queryMetadataForKey for state stores with Processor > API > --------------------------------------------------------------------------------- > > Key: KAFKA-14070 > URL: https://issues.apache.org/jira/browse/KAFKA-14070 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 3.2.0 > Reporter: Balaji Rao > Priority: Minor > > Using {{queryMetadataForKey}} for state stores with Processor API is tricky. > One could use state stores in Processor API in ways that would make it > impossible to use {{queryMetadataForKey}} with just a key alone - one would > have to know the input record's key. This could lead to the method being > called with incorrect expectations. The documentation could be improved > around this, and around using state stores with the Processor API in general. > Example Scala snippet: > {code:scala} > val input = streamsBuilder.stream( > "input-topic", > Consumed.`with`(Serdes.intSerde, Serdes.stringSerde) > ) > private val storeBuilder = Stores > .keyValueStoreBuilder[String, String]( > Stores.inMemoryKeyValueStore("store"), > Serdes.stringSerde, > Serdes.stringSerde > ) > streamsBuilder.addStateStore(storeBuilder) > input.process( > new ProcessorSupplier[Int, String, Void, Void] { > override def get(): Processor[Int, String, Void, Void] = > new Processor[Int, String, Void, Void] { > var store: KeyValueStore[String, String] = _ > override def init(context: ProcessorContext[Void, Void]): Unit = { > super.init(context) > store = context.getStateStore("store") > } > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => > store.put(s"${record.key}-$x", record.value) > ) > } > } > }, > "store" > ) > {code} > In the code sample above, AFAICT there is no way the possible partition of > the {{store}} containing the key {{"1-a"}} could be determined by calling > {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call > {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in > this case the {{Int}} 1, to find the partition. > > Example 2: > The same as above, but with a different {{process}} method. > {code:scala} > override def process(record: Record[Int, String]): Unit = { > ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}")) > }{code} > In this case the key {{"a"}} could exist in multiple partitions, with > different values in different partitions. In this case, AFAICT, one must use > {{queryMetadataForKey}} with an {{Int}} to determine the partition where a > given {{String}} would be stored. -- This message was sent by Atlassian Jira (v8.20.10#820010)