[ 
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Balaji Rao updated KAFKA-14070:
-------------------------------
    Description: 
Using {{queryMetadataForKey}} for state stores with Processor API is tricky. 
One could use state stores in Processor API in ways that would make it 
impossible to use {{queryMetadataForKey}} with just a key alone - one would 
have to know the input record's key. This could lead to the method being called 
with incorrect expectations. The documentation could be improved around this, 
and around using state stores with the Processor API in general.

Example snippet:
{code:scala}
val input = streamsBuilder.stream(
    "input-topic",
    Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
  )

  private val storeBuilder = Stores
    .keyValueStoreBuilder[String, String](
      Stores.inMemoryKeyValueStore("store"),
      Serdes.stringSerde,
      Serdes.stringSerde
    )

  streamsBuilder.addStateStore(storeBuilder)
  input.process(
    new ProcessorSupplier[Int, String, Void, Void] {
      override def get(): Processor[Int, String, Void, Void] =
        new Processor[Int, String, Void, Void] {

          var store: KeyValueStore[String, String] = _

          override def init(context: ProcessorContext[Void, Void]): Unit = {
            super.init(context)
            store = context.getStateStore("store")
          }

          override def process(record: Record[Int, String]): Unit = {
            ('a' to 'j').foreach(x =>
              store.put(s"${record.key}-$x", record.value)
            )
          }
        }
    },
    "store"
  )
{code}
In the code sample above, AFAICT there is no way the possible partition of the 
{{store}} containing the key {{"1-a"}} could be determined by calling 
{{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call 
{{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in 
this case the {{Int}} 1, to find the partition.

 
Example 2:

The same as above, but with a different {{process}} method.
{code:scala}
override def process(record: Record[Int, String]): Unit = {
  ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
}{code}
In this case the key {{"a"}} could exist in multiple partitions, with different 
values in different partition. In this case, AFAICT, one must use 
{{queryMetadataForKey}} with an {{Int}} to determine the partition where a 
given {{String}} would be stored.

  was:
When using key-value state stores with Processor API, one can add key-value 
state stores of arbitrary key types to a topology. This could lead to the 
method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect 
expectations.

In my understanding, `queryMetadataForKey` uses the source topics of the 
processor connected to the store to return the `KeyQueryMetadata`. This means 
that it could provide "incorrect" answers when used with key-value stores of 
arbitrary key types. The description of the method should be improved to make 
users aware of this pitfall.

Edit: Example scala code
{code:scala}
val input = streamsBuilder.stream(
  "input-topic",
  Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
)

private val storeBuilder = Stores
  .keyValueStoreBuilder[String, String](
    Stores.inMemoryKeyValueStore("store"),
    Serdes.stringSerde,
    Serdes.stringSerde
  )

streamsBuilder.addStateStore(storeBuilder)
input.process(
  new ProcessorSupplier[Int, String, Void, Void] {
    override def get(): Processor[Int, String, Void, Void] =
      new Processor[Int, String, Void, Void] {

        var store: KeyValueStore[String, String] = _

        override def init(context: ProcessorContext[Void, Void]): Unit = {
          super.init(context)
          store = context.getStateStore("store")
        }

        override def process(record: Record[Int, String]): Unit = {
          ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
        }
      }
  },
  "store"
){code}
 

In the code sample above, the usage of `queryMetadataForKey` on `store` to find 
the 


> Improve documentation for queryMetadataForKey
> ---------------------------------------------
>
>                 Key: KAFKA-14070
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14070
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 3.2.0
>            Reporter: Balaji Rao
>            Priority: Minor
>
> Using {{queryMetadataForKey}} for state stores with Processor API is tricky. 
> One could use state stores in Processor API in ways that would make it 
> impossible to use {{queryMetadataForKey}} with just a key alone - one would 
> have to know the input record's key. This could lead to the method being 
> called with incorrect expectations. The documentation could be improved 
> around this, and around using state stores with the Processor API in general.
> Example snippet:
> {code:scala}
> val input = streamsBuilder.stream(
>     "input-topic",
>     Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
>   )
>   private val storeBuilder = Stores
>     .keyValueStoreBuilder[String, String](
>       Stores.inMemoryKeyValueStore("store"),
>       Serdes.stringSerde,
>       Serdes.stringSerde
>     )
>   streamsBuilder.addStateStore(storeBuilder)
>   input.process(
>     new ProcessorSupplier[Int, String, Void, Void] {
>       override def get(): Processor[Int, String, Void, Void] =
>         new Processor[Int, String, Void, Void] {
>           var store: KeyValueStore[String, String] = _
>           override def init(context: ProcessorContext[Void, Void]): Unit = {
>             super.init(context)
>             store = context.getStateStore("store")
>           }
>           override def process(record: Record[Int, String]): Unit = {
>             ('a' to 'j').foreach(x =>
>               store.put(s"${record.key}-$x", record.value)
>             )
>           }
>         }
>     },
>     "store"
>   )
> {code}
> In the code sample above, AFAICT there is no way the possible partition of 
> the {{store}} containing the key {{"1-a"}} could be determined by calling 
> {{queryMetadataForKey}} with the string {{{}"1-a"{}}}. One has to call 
> {{queryMetadataForKey}} with the record's key that produced {{{}"1-a"{}}}, in 
> this case the {{Int}} 1, to find the partition.
>  
> Example 2:
> The same as above, but with a different {{process}} method.
> {code:scala}
> override def process(record: Record[Int, String]): Unit = {
>   ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }{code}
> In this case the key {{"a"}} could exist in multiple partitions, with 
> different values in different partition. In this case, AFAICT, one must use 
> {{queryMetadataForKey}} with an {{Int}} to determine the partition where a 
> given {{String}} would be stored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to