Hi Pirow, You can configure the auto offset reset for your stream source's consumer to "earliest" if you want to consume all available data if no committed offset exists. This will populate the state store on first run.
Cheers, Liam Clarke-Hutchinson On Thu, 20 Aug. 2020, 11:58 pm Pirow Engelbrecht, < pirow.engelbre...@etion.co.za> wrote: > Hi Bill, > > > > Yes, that seems to be exactly what I need. I’ve instantiated this global > store with: > > topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String(). > deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic, > "KVprocessor", KeyValueProcessor::new); > > > > > > I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs > into the store. The problem is that if the application starts for the first > time, it does not process any key-value pairs already in the Kafka topic. > Is there a way around this? > > > > Thanks > > > > *Pirow Engelbrecht* > System Engineer > > *E.* pirow.engelbre...@etion.co.za > *T.* +27 12 678 9740 (ext. 9879) > *M.* +27 63 148 3376 > > 76 Regency Drive | Irene | Centurion | 0157 > <https://goo.gl/maps/v9ZbwjqpPyL2> > *www.etion.co.za* <https://www.parsec.co.za/> > > <https://www.parsec.co.za/> > > Facebook > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter > <https://twitter.com/Etionlimited> | Instagram > <https://www.instagram.com/Etionlimited/> > > > > *From:* Bill Bejeck <b...@confluent.io> > *Sent:* Wednesday, 19 August 2020 3:53 PM > *To:* users@kafka.apache.org > *Subject:* Re: Kafka Streams Key-value store question > > > > Hi Pirow, > > If I'm understanding your requirements correctly, I think using a global > store > < > https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier- > > > will > work for you. > > HTH, > Bill > > On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht < > pirow.engelbre...@etion.co.za> wrote: > > > Hello, > > > > > > > > We’re building a JSON decorator using Kafka Streams’ processing API. > > > > > > > > The process is briefly that a piece of JSON should be consumed from an > > input topic (keys are null, value is the JSON). The JSON contains a field > > (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a > > timestamp) is used to look-up another piece JSON from a key-value topic > > (keys are all the different values of “thisField”, values are JSON). This > > key-value topic is created by another service in Kafka. This additional > > piece of JSON then gets appended to the input JSON and the result gets > > written to an output topic (keys are null, value is now the original > JSON + > > lookup JSON). > > > > > > > > To do the query against a key-value store, ideally I want Kafka Streams > to > > directly create and update a window key-value store in memory (or disk) > > from my key-value topic in Kafka, but I am unable to find a way to > specify > > this through the StoreBuilder interface. Does anybody know how to do > this? > > > > Here is my current Storebuilder code snippet: > > > > StoreBuilder<WindowStore<String, String>> storeBuilder = Stores. > > windowStoreBuilder( > > > > Stores.persistentWindowStore("loopkupStore", > > Duration.ofDays(14600), Duration.ofDays(14600), false), > > > > Serdes.String(), > > > > Serdes.String()); > > > > storeBuilder.build(); > > > > > > > > > > > > Currently my workaround is to have a sink for the key-value store and > then > > create/update this key-value store using a node in the processing > topology, > > but this has issues when restarting the service, i.e. when the service is > > restarted, the key-value store topic needs to be consumed from the start > to > > rebuild the store in memory, but the sink would have written commit > offsets > > which prevents the topic to be consumed from the start. I also cannot use > > streams.cleanUp() as this will reset all the sinks in my topology (y > other > > sink ingests records from the input topic). > > > > > > > > Thanks > > > > > > > > *Pirow Engelbrecht* > > System Engineer > > > > *E.* pirow.engelbre...@etion.co.za > > *T.* +27 12 678 9740 (ext. 9879) > > *M.* +27 63 148 3376 > > > > 76 Regency Drive | Irene | Centurion | 0157 > > <https://goo.gl/maps/v9ZbwjqpPyL2> > > *www.etion.co.za <https://www.parsec.co.za/>* > > > > <https://www.parsec.co.za/> > > > > Facebook > > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | > > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | > > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter > > <https://twitter.com/Etionlimited> | Instagram > > <https://www.instagram.com/Etionlimited/> > > > > > > >