`auto.offset.reset` does not apply for global-store-topics. At startup, we app would always "seek-to-beginning" for a global-store-topic, bootstrap the global-store, and afterwards start the actually processing.
However, no offsets are committed for global-store-topics. Maybe this is the reason why you think no data was read? -Matthias On 8/20/20 5:30 AM, Liam Clarke-Hutchinson wrote: > Hi Pirow, > > You can configure the auto offset reset for your stream source's consumer > to "earliest" if you want to consume all available data if no committed > offset exists. This will populate the state store on first run. > > Cheers, > > Liam Clarke-Hutchinson > > > On Thu, 20 Aug. 2020, 11:58 pm Pirow Engelbrecht, < > pirow.engelbre...@etion.co.za> wrote: > >> Hi Bill, >> >> >> >> Yes, that seems to be exactly what I need. I’ve instantiated this global >> store with: >> >> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String(). >> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic, >> "KVprocessor", KeyValueProcessor::new); >> >> >> >> >> >> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs >> into the store. The problem is that if the application starts for the first >> time, it does not process any key-value pairs already in the Kafka topic. >> Is there a way around this? >> >> >> >> Thanks >> >> >> >> *Pirow Engelbrecht* >> System Engineer >> >> *E.* pirow.engelbre...@etion.co.za >> *T.* +27 12 678 9740 (ext. 9879) >> *M.* +27 63 148 3376 >> >> 76 Regency Drive | Irene | Centurion | 0157 >> <https://goo.gl/maps/v9ZbwjqpPyL2> >> *www.etion.co.za* <https://www.parsec.co.za/> >> >> <https://www.parsec.co.za/> >> >> Facebook >> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | >> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | >> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter >> <https://twitter.com/Etionlimited> | Instagram >> <https://www.instagram.com/Etionlimited/> >> >> >> >> *From:* Bill Bejeck <b...@confluent.io> >> *Sent:* Wednesday, 19 August 2020 3:53 PM >> *To:* users@kafka.apache.org >> *Subject:* Re: Kafka Streams Key-value store question >> >> >> >> Hi Pirow, >> >> If I'm understanding your requirements correctly, I think using a global >> store >> < >> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier- >>> >> will >> work for you. >> >> HTH, >> Bill >> >> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht < >> pirow.engelbre...@etion.co.za> wrote: >> >>> Hello, >>> >>> >>> >>> We’re building a JSON decorator using Kafka Streams’ processing API. >>> >>> >>> >>> The process is briefly that a piece of JSON should be consumed from an >>> input topic (keys are null, value is the JSON). The JSON contains a field >>> (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a >>> timestamp) is used to look-up another piece JSON from a key-value topic >>> (keys are all the different values of “thisField”, values are JSON). This >>> key-value topic is created by another service in Kafka. This additional >>> piece of JSON then gets appended to the input JSON and the result gets >>> written to an output topic (keys are null, value is now the original >> JSON + >>> lookup JSON). >>> >>> >>> >>> To do the query against a key-value store, ideally I want Kafka Streams >> to >>> directly create and update a window key-value store in memory (or disk) >>> from my key-value topic in Kafka, but I am unable to find a way to >> specify >>> this through the StoreBuilder interface. Does anybody know how to do >> this? >>> >>> Here is my current Storebuilder code snippet: >>> >>> StoreBuilder<WindowStore<String, String>> storeBuilder = Stores. >>> windowStoreBuilder( >>> >>> Stores.persistentWindowStore("loopkupStore", >>> Duration.ofDays(14600), Duration.ofDays(14600), false), >>> >>> Serdes.String(), >>> >>> Serdes.String()); >>> >>> storeBuilder.build(); >>> >>> >>> >>> >>> >>> Currently my workaround is to have a sink for the key-value store and >> then >>> create/update this key-value store using a node in the processing >> topology, >>> but this has issues when restarting the service, i.e. when the service is >>> restarted, the key-value store topic needs to be consumed from the start >> to >>> rebuild the store in memory, but the sink would have written commit >> offsets >>> which prevents the topic to be consumed from the start. I also cannot use >>> streams.cleanUp() as this will reset all the sinks in my topology (y >> other >>> sink ingests records from the input topic). >>> >>> >>> >>> Thanks >>> >>> >>> >>> *Pirow Engelbrecht* >>> System Engineer >>> >>> *E.* pirow.engelbre...@etion.co.za >>> *T.* +27 12 678 9740 (ext. 9879) >>> *M.* +27 63 148 3376 >>> >>> 76 Regency Drive | Irene | Centurion | 0157 >>> <https://goo.gl/maps/v9ZbwjqpPyL2> >>> *www.etion.co.za <https://www.parsec.co.za/>* >>> >>> <https://www.parsec.co.za/> >>> >>> Facebook >>> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | >>> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | >>> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter >>> <https://twitter.com/Etionlimited> | Instagram >>> <https://www.instagram.com/Etionlimited/> >>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature