`auto.offset.reset` does not apply for global-store-topics.

At startup, we app would always "seek-to-beginning" for a
global-store-topic, bootstrap the global-store, and afterwards start the
actually processing.

However, no offsets are committed for global-store-topics. Maybe this is
the reason why you think no data was read?


-Matthias

On 8/20/20 5:30 AM, Liam Clarke-Hutchinson wrote:
> Hi Pirow,
> 
> You can configure the auto offset reset for your stream source's consumer
> to "earliest" if you want to consume all available data if no committed
> offset exists. This will populate the state store on first run.
> 
> Cheers,
> 
> Liam Clarke-Hutchinson
> 
> 
> On Thu, 20 Aug. 2020, 11:58 pm Pirow Engelbrecht, <
> pirow.engelbre...@etion.co.za> wrote:
> 
>> Hi Bill,
>>
>>
>>
>> Yes, that seems to be exactly what I need. I’ve instantiated this global
>> store with:
>>
>> topology.addGlobalStore(storeBuilder, "KVstoreSource", Serdes.String().
>> deserializer(), Serdes.String().deserializer(), this.config_kvStoreTopic,
>> "KVprocessor", KeyValueProcessor::new);
>>
>>
>>
>>
>>
>> I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs
>> into the store. The problem is that if the application starts for the first
>> time, it does not process any key-value pairs already in the Kafka topic.
>> Is there a way around this?
>>
>>
>>
>> Thanks
>>
>>
>>
>> *Pirow Engelbrecht*
>> System Engineer
>>
>> *E.* pirow.engelbre...@etion.co.za
>> *T.* +27 12 678 9740 (ext. 9879)
>> *M.* +27 63 148 3376
>>
>> 76 Regency Drive | Irene | Centurion | 0157
>> <https://goo.gl/maps/v9ZbwjqpPyL2>
>> *www.etion.co.za* <https://www.parsec.co.za/>
>>
>> <https://www.parsec.co.za/>
>>
>> Facebook
>> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
>> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
>> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
>> <https://twitter.com/Etionlimited> | Instagram
>> <https://www.instagram.com/Etionlimited/>
>>
>>
>>
>> *From:* Bill Bejeck <b...@confluent.io>
>> *Sent:* Wednesday, 19 August 2020 3:53 PM
>> *To:* users@kafka.apache.org
>> *Subject:* Re: Kafka Streams Key-value store question
>>
>>
>>
>> Hi Pirow,
>>
>> If I'm understanding your requirements correctly, I think using a global
>> store
>> <
>> https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-
>>>
>> will
>> work for you.
>>
>> HTH,
>> Bill
>>
>> On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
>> pirow.engelbre...@etion.co.za> wrote:
>>
>>> Hello,
>>>
>>>
>>>
>>> We’re building a JSON decorator using Kafka Streams’ processing API.
>>>
>>>
>>>
>>> The process is briefly that a piece of JSON should be consumed from an
>>> input topic (keys are null, value is the JSON). The JSON contains a field
>>> (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
>>> timestamp) is used to look-up another piece JSON from a key-value topic
>>> (keys are all the different values of “thisField”, values are JSON). This
>>> key-value topic is created by another service in Kafka. This additional
>>> piece of JSON then gets appended to the input JSON and the result gets
>>> written to an output topic (keys are null, value is now the original
>> JSON +
>>> lookup JSON).
>>>
>>>
>>>
>>> To do the query against a key-value store, ideally I want Kafka Streams
>> to
>>> directly create and update a window key-value store in memory (or disk)
>>> from my key-value topic in Kafka, but I am unable to find a way to
>> specify
>>> this through the StoreBuilder interface. Does anybody know how to do
>> this?
>>>
>>> Here is my current Storebuilder code snippet:
>>>
>>> StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
>>> windowStoreBuilder(
>>>
>>> Stores.persistentWindowStore("loopkupStore",
>>> Duration.ofDays(14600), Duration.ofDays(14600), false),
>>>
>>> Serdes.String(),
>>>
>>> Serdes.String());
>>>
>>> storeBuilder.build();
>>>
>>>
>>>
>>>
>>>
>>> Currently my workaround is to have a sink for the key-value store and
>> then
>>> create/update this key-value store using a node in the processing
>> topology,
>>> but this has issues when restarting the service, i.e. when the service is
>>> restarted, the key-value store topic needs to be consumed from the start
>> to
>>> rebuild the store in memory, but the sink would have written commit
>> offsets
>>> which prevents the topic to be consumed from the start. I also cannot use
>>> streams.cleanUp() as this will reset all the sinks in my topology (y
>> other
>>> sink ingests records from the input topic).
>>>
>>>
>>>
>>> Thanks
>>>
>>>
>>>
>>> *Pirow Engelbrecht*
>>> System Engineer
>>>
>>> *E.* pirow.engelbre...@etion.co.za
>>> *T.* +27 12 678 9740 (ext. 9879)
>>> *M.* +27 63 148 3376
>>>
>>> 76 Regency Drive | Irene | Centurion | 0157
>>> <https://goo.gl/maps/v9ZbwjqpPyL2>
>>> *www.etion.co.za <https://www.parsec.co.za/>*
>>>
>>> <https://www.parsec.co.za/>
>>>
>>> Facebook
>>> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> |
>>> YouTube <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> |
>>> LinkedIn <https://www.linkedin.com/company/etionltd> | Twitter
>>> <https://twitter.com/Etionlimited> | Instagram
>>> <https://www.instagram.com/Etionlimited/>
>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to