Hi - I have a custom state store in my Kafka Streams application. I have developed the whole Topology and the Processor abstractions. I have the custom state store implemented as well and have the following builder for hooking up the processors and the store .. Here's the Scala snippet ..
builder.addSource("Source", "server-log") .addProcessor("Process", () => new WeblogProcessor(), "Source") .addStateStore(new BFStoreSupplier[String]("log-counts", stringSerde, true, changelogConfig.asJava), "Process") Now I am developing the infrastructure of querying from the custom store .. For a KeyValueStore, I can do something like the following .. val q: QueryableStoreType[ReadOnlyKeyValueStore[K, V]] = QueryableStoreTypes.keyValueStore() val localStore: ReadOnlyKeyValueStore[K, V] = streams.store(store, q) localStore.get(key) For a custom store type, how do I create an instance of QueryableStoreType[...] ? any help will be appreciated .. regards. -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg