Hi Debasish, You can just implement the QueryableStoreType interface. You can take a look here: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/QueryableStoreTypes.java#L77 for an example.
Then you just pass your implementation to `kafkaStreams.store(storeName, yourQueryableType)` HTH, Damian On Fri, 30 Jun 2017 at 13:00 Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > Hi - > > I have a custom state store in my Kafka Streams application. I have > developed the whole Topology and the Processor abstractions. I have the > custom state store implemented as well and have the following builder for > hooking up the processors and the store .. Here's the Scala snippet .. > > builder.addSource("Source", "server-log") > .addProcessor("Process", () => new WeblogProcessor(), "Source") > .addStateStore(new BFStoreSupplier[String]("log-counts", > stringSerde, true, changelogConfig.asJava), "Process") > > Now I am developing the infrastructure of querying from the custom store .. > For a KeyValueStore, I can do something like the following .. > > val q: QueryableStoreType[ReadOnlyKeyValueStore[K, V]] = > QueryableStoreTypes.keyValueStore() > val localStore: ReadOnlyKeyValueStore[K, V] = streams.store(store, q) > localStore.get(key) > > For a custom store type, how do I create an instance of > QueryableStoreType[...] ? > > any help will be appreciated .. > > regards. > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >