Hi Debasish,

You can just implement the QueryableStoreType interface. You can take a
look here:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java#L77
for
an example.

Then you just pass your implementation to `kafkaStreams.store(storeName,
yourQueryableType)`

HTH,
Damian

On Fri, 30 Jun 2017 at 13:00 Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> Hi -
>
> I have a custom state store in my Kafka Streams application. I have
> developed the whole Topology and the Processor abstractions. I have the
> custom state store implemented as well and have the following builder for
> hooking up the processors and the store .. Here's the Scala snippet ..
>
> builder.addSource("Source", "server-log")
>        .addProcessor("Process", () => new WeblogProcessor(), "Source")
>        .addStateStore(new BFStoreSupplier[String]("log-counts",
> stringSerde, true, changelogConfig.asJava), "Process")
>
> Now I am developing the infrastructure of querying from the custom store ..
> For a KeyValueStore, I can do something like the following ..
>
> val q: QueryableStoreType[ReadOnlyKeyValueStore[K, V]] =
> QueryableStoreTypes.keyValueStore()
> val localStore: ReadOnlyKeyValueStore[K, V] = streams.store(store, q)
> localStore.get(key)
>
> For a custom store type, how do I create an instance of
> QueryableStoreType[...] ?
>
> any help will be appreciated ..
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>

Reply via email to