You need to set the auto offset reset to earliest, it uses latest as
default.

StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
"earliest"

Le jeu. 20 août 2020 à 13:58, Pirow Engelbrecht <
pirow.engelbre...@etion.co.za> a écrit :

> Hi Bill,
>
>
>
> Yes, that seems to be exactly what I need. I’ve instantiated this global
> store with:
>
> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().
> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic,
> "KVprocessor", KeyValueProcessor::new);
>
>
>
>
>
> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs
> into the store. The problem is that if the application starts for the first
> time, it does not process any key-value pairs already in the Kafka topic.
> Is there a way around this?
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbre...@etion.co.za
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2>
> *www.etion.co.za* <https://www.parsec.co.za/>
>
> <https://www.parsec.co.za/>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> <https://twitter.com/Etionlimited> | Instagram
> <https://www.instagram.com/Etionlimited/>
>
>
>
> *From:* Bill Bejeck <b...@confluent.io>
> *Sent:* Wednesday, 19 August 2020 3:53 PM
> *To:* users@kafka.apache.org
> *Subject:* Re: Kafka Streams Key-value store question
>
>
>
> Hi Pirow,
>
> If I'm understanding your requirements correctly, I think using a global
> store
> <
> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-
> >
> will
> work for you.
>
> HTH,
> Bill
>
> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
> pirow.engelbre...@etion.co.za> wrote:
>
> > Hello,
> >
> >
> >
> > We’re building a JSON decorator using Kafka Streams’ processing API.
> >
> >
> >
> > The process is briefly that a piece of JSON should be consumed from an
> > input topic (keys are null, value is the JSON). The JSON contains a field
> > (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> > timestamp) is used to look-up another piece JSON from a key-value topic
> > (keys are all the different values of “thisField”, values are JSON). This
> > key-value topic is created by another service in Kafka. This additional
> > piece of JSON then gets appended to the input JSON and the result gets
> > written to an output topic (keys are null, value is now the original
> JSON +
> > lookup JSON).
> >
> >
> >
> > To do the query against a key-value store, ideally I want Kafka Streams
> to
> > directly create and update a window key-value store in memory (or disk)
> > from my key-value topic in Kafka, but I am unable to find a way to
> specify
> > this through the StoreBuilder interface. Does anybody know how to do
> this?
> >
> > Here is my current Storebuilder code snippet:
> >
> > StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
> > windowStoreBuilder(
> >
> > Stores.persistentWindowStore("loopkupStore",
> > Duration.ofDays(14600), Duration.ofDays(14600), false),
> >
> > Serdes.String(),
> >
> > Serdes.String());
> >
> > storeBuilder.build();
> >
> >
> >
> >
> >
> > Currently my workaround is to have a sink for the key-value store and
> then
> > create/update this key-value store using a node in the processing
> topology,
> > but this has issues when restarting the service, i.e. when the service is
> > restarted, the key-value store topic needs to be consumed from the start
> to
> > rebuild the store in memory, but the sink would have written commit
> offsets
> > which prevents the topic to be consumed from the start. I also cannot use
> > streams.cleanUp() as this will reset all the sinks in my topology (y
> other
> > sink ingests records from the input topic).
> >
> >
> >
> > Thanks
> >
> >
> >
> > *Pirow Engelbrecht*
> > System Engineer
> >
> > *E.* pirow.engelbre...@etion.co.za
> > *T.* +27 12 678 9740 (ext. 9879)
> > *M.* +27 63 148 3376
> >
> > 76 Regency Drive | Irene | Centurion | 0157
> > <https://goo.gl/maps/v9ZbwjqpPyL2>
> > *www.etion.co.za <https://www.parsec.co.za/>*
> >
> > <https://www.parsec.co.za/>
> >
> > Facebook
> > <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
> > YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
> > LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
> > <https://twitter.com/Etionlimited> | Instagram
> > <https://www.instagram.com/Etionlimited/>
> >
> >
> >
>


-- 
*Nicolas Carlot*

Lead dev
|  | nicolas.car...@chronopost.fr


*Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*

[image: Logo Chronopost]
| chronopost.fr <http://www.chronopost.fr/>
Suivez nous sur Facebook <https://fr-fr.facebook.com/chronopost> et Twitter
<https://twitter.com/chronopost>.

[image: DPD Group]

Reply via email to