Hi -

I have a custom state store in my Kafka Streams application. I have
developed the whole Topology and the Processor abstractions. I have the
custom state store implemented as well and have the following builder for
hooking up the processors and the store .. Here's the Scala snippet ..

builder.addSource("Source", "server-log")
       .addProcessor("Process", () => new WeblogProcessor(), "Source")
       .addStateStore(new BFStoreSupplier[String]("log-counts",
stringSerde, true, changelogConfig.asJava), "Process")

Now I am developing the infrastructure of querying from the custom store ..
For a KeyValueStore, I can do something like the following ..

val q: QueryableStoreType[ReadOnlyKeyValueStore[K, V]] =
QueryableStoreTypes.keyValueStore()
val localStore: ReadOnlyKeyValueStore[K, V] = streams.store(store, q)
localStore.get(key)

For a custom store type, how do I create an instance of
QueryableStoreType[...] ?

any help will be appreciated ..

regards.

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to