Thanks for the pointer .. I finally figured it out .. Now I am facing another issue with custom state store - sent a different mail on the forum.
regards. On Fri, Jun 30, 2017 at 11:12 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Debasish, > > You can just implement the QueryableStoreType interface. You can take a > look here: https://github.com/apache/kafka/blob/trunk/ > streams/src/main/java/org/apache/kafka/streams/state/ > QueryableStoreTypes.java#L77 for an example. > > Then you just pass your implementation to `kafkaStreams.store(storeName, > yourQueryableType)` > > HTH, > Damian > > On Fri, 30 Jun 2017 at 13:00 Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > >> Hi - >> >> I have a custom state store in my Kafka Streams application. I have >> developed the whole Topology and the Processor abstractions. I have the >> custom state store implemented as well and have the following builder for >> hooking up the processors and the store .. Here's the Scala snippet .. >> >> builder.addSource("Source", "server-log") >> .addProcessor("Process", () => new WeblogProcessor(), "Source") >> .addStateStore(new BFStoreSupplier[String]("log-counts", >> stringSerde, true, changelogConfig.asJava), "Process") >> >> Now I am developing the infrastructure of querying from the custom store >> .. >> For a KeyValueStore, I can do something like the following .. >> >> val q: QueryableStoreType[ReadOnlyKeyValueStore[K, V]] = >> QueryableStoreTypes.keyValueStore() >> val localStore: ReadOnlyKeyValueStore[K, V] = streams.store(store, q) >> localStore.get(key) >> >> For a custom store type, how do I create an instance of >> QueryableStoreType[...] ? >> >> any help will be appreciated .. >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg