You need to set the auto offset reset to earliest, it uses latest as default.
StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest" Le jeu. 20 août 2020 à 13:58, Pirow Engelbrecht < pirow.engelbre...@etion.co.za> a écrit : > Hi Bill, > > > > Yes, that seems to be exactly what I need. I’ve instantiated this global > store with: > > topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String(). > deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic, > "KVprocessor", KeyValueProcessor::new); > > > > > > I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs > into the store. The problem is that if the application starts for the first > time, it does not process any key-value pairs already in the Kafka topic. > Is there a way around this? > > > > Thanks > > > > *Pirow Engelbrecht* > System Engineer > > *E.* pirow.engelbre...@etion.co.za > *T.* +27 12 678 9740 (ext. 9879) > *M.* +27 63 148 3376 > > 76 Regency Drive | Irene | Centurion | 0157 > <https://goo.gl/maps/v9ZbwjqpPyL2> > *www.etion.co.za* <https://www.parsec.co.za/> > > <https://www.parsec.co.za/> > > Facebook > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter > <https://twitter.com/Etionlimited> | Instagram > <https://www.instagram.com/Etionlimited/> > > > > *From:* Bill Bejeck <b...@confluent.io> > *Sent:* Wednesday, 19 August 2020 3:53 PM > *To:* users@kafka.apache.org > *Subject:* Re: Kafka Streams Key-value store question > > > > Hi Pirow, > > If I'm understanding your requirements correctly, I think using a global > store > < > https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier- > > > will > work for you. > > HTH, > Bill > > On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht < > pirow.engelbre...@etion.co.za> wrote: > > > Hello, > > > > > > > > We’re building a JSON decorator using Kafka Streams’ processing API. > > > > > > > > The process is briefly that a piece of JSON should be consumed from an > > input topic (keys are null, value is the JSON). The JSON contains a field > > (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a > > timestamp) is used to look-up another piece JSON from a key-value topic > > (keys are all the different values of “thisField”, values are JSON). This > > key-value topic is created by another service in Kafka. This additional > > piece of JSON then gets appended to the input JSON and the result gets > > written to an output topic (keys are null, value is now the original > JSON + > > lookup JSON). > > > > > > > > To do the query against a key-value store, ideally I want Kafka Streams > to > > directly create and update a window key-value store in memory (or disk) > > from my key-value topic in Kafka, but I am unable to find a way to > specify > > this through the StoreBuilder interface. Does anybody know how to do > this? > > > > Here is my current Storebuilder code snippet: > > > > StoreBuilder<WindowStore<String, String>> storeBuilder = Stores. > > windowStoreBuilder( > > > > Stores.persistentWindowStore("loopkupStore", > > Duration.ofDays(14600), Duration.ofDays(14600), false), > > > > Serdes.String(), > > > > Serdes.String()); > > > > storeBuilder.build(); > > > > > > > > > > > > Currently my workaround is to have a sink for the key-value store and > then > > create/update this key-value store using a node in the processing > topology, > > but this has issues when restarting the service, i.e. when the service is > > restarted, the key-value store topic needs to be consumed from the start > to > > rebuild the store in memory, but the sink would have written commit > offsets > > which prevents the topic to be consumed from the start. I also cannot use > > streams.cleanUp() as this will reset all the sinks in my topology (y > other > > sink ingests records from the input topic). > > > > > > > > Thanks > > > > > > > > *Pirow Engelbrecht* > > System Engineer > > > > *E.* pirow.engelbre...@etion.co.za > > *T.* +27 12 678 9740 (ext. 9879) > > *M.* +27 63 148 3376 > > > > 76 Regency Drive | Irene | Centurion | 0157 > > <https://goo.gl/maps/v9ZbwjqpPyL2> > > *www.etion.co.za <https://www.parsec.co.za/>* > > > > <https://www.parsec.co.za/> > > > > Facebook > > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | > > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | > > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter > > <https://twitter.com/Etionlimited> | Instagram > > <https://www.instagram.com/Etionlimited/> > > > > > > > -- *Nicolas Carlot* Lead dev | | nicolas.car...@chronopost.fr *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris* [image: Logo Chronopost] | chronopost.fr <http://www.chronopost.fr/> Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter <https://twitter.com/chronopost>. [image: DPD Group]