[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Balaji Rao updated KAFKA-14070:
-------------------------------
    Description: 
When using key-value state stores with Processor API, one can add key-value 
state stores of arbitrary key types to a topology. This could lead to the 
method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect 
expectations.

In my understanding, `queryMetadataForKey` uses the source topics of the 
processor connected to the store to return the `KeyQueryMetadata`. This means 
that it could provide "incorrect" answers when used with key-value stores of 
arbitrary key types. The description of the method should be improved to make 
users aware of this pitfall.

Edit: Example scala code
{code:scala}
val input = streamsBuilder.stream(
  "input-topic",
  Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
)

private val storeBuilder = Stores
  .keyValueStoreBuilder[String, String](
    Stores.inMemoryKeyValueStore("store"),
    Serdes.stringSerde,
    Serdes.stringSerde
  )

streamsBuilder.addStateStore(storeBuilder)
input.process(
  new ProcessorSupplier[Int, String, Void, Void] {
    override def get(): Processor[Int, String, Void, Void] =
      new Processor[Int, String, Void, Void] {

        var store: KeyValueStore[String, String] = _

        override def init(context: ProcessorContext[Void, Void]): Unit = {
          super.init(context)
          store = context.getStateStore("store")
        }

        override def process(record: Record[Int, String]): Unit = {
          ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
        }
      }
  },
  "store"
){code}
 

In the code sample above, the usage of `queryMetadataForKey` on `store` to find 
the 

  was:
When using key-value state stores with Processor API, one can add key-value 
state stores of arbitrary key types to a topology. This could lead to the 
method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect 
expectations.

In my understanding, `queryMetadataForKey` uses the source topics of the 
processor connected to the store to return the `KeyQueryMetadata`. This means 
that it could provide "incorrect" answers when used with key-value stores of 
arbitrary key types. The description of the method should be improved to make 
users aware of this pitfall.


> Improve documentation for queryMetadataForKey
> ---------------------------------------------
>
>                 Key: KAFKA-14070
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14070
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.2.0
>            Reporter: Balaji Rao
>            Priority: Minor
>
> When using key-value state stores with Processor API, one can add key-value 
> state stores of arbitrary key types to a topology. This could lead to the 
> method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect 
> expectations.
> In my understanding, `queryMetadataForKey` uses the source topics of the 
> processor connected to the store to return the `KeyQueryMetadata`. This means 
> that it could provide "incorrect" answers when used with key-value stores of 
> arbitrary key types. The description of the method should be improved to make 
> users aware of this pitfall.
> Edit: Example scala code
> {code:scala}
> val input = streamsBuilder.stream(
>   "input-topic",
>   Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
> )
> private val storeBuilder = Stores
>   .keyValueStoreBuilder[String, String](
>     Stores.inMemoryKeyValueStore("store"),
>     Serdes.stringSerde,
>     Serdes.stringSerde
>   )
> streamsBuilder.addStateStore(storeBuilder)
> input.process(
>   new ProcessorSupplier[Int, String, Void, Void] {
>     override def get(): Processor[Int, String, Void, Void] =
>       new Processor[Int, String, Void, Void] {
>         var store: KeyValueStore[String, String] = _
>         override def init(context: ProcessorContext[Void, Void]): Unit = {
>           super.init(context)
>           store = context.getStateStore("store")
>         }
>         override def process(record: Record[Int, String]): Unit = {
>           ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
>         }
>       }
>   },
>   "store"
> ){code}
>  
> In the code sample above, the usage of `queryMetadataForKey` on `store` to 
> find the 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to