Hi Bill,

Yes, that seems to be exactly what I need. I’ve instantiated this global store 
with:
topology.addGlobalStore(storeBuilder, "KVstoreSource", 
Serdes.String().deserializer(), Serdes.String().deserializer(), 
this.config_kvStoreTopic, "KVprocessor", KeyValueProcessor::new);


I’ve added a KeyValueProcessor that puts incoming Kafka key-value pairs into 
the store. The problem is that if the application starts for the first time, it 
does not process any key-value pairs already in the Kafka topic. Is there a way 
around this?

Thanks

Pirow Engelbrecht
System Engineer

E. 
pirow.engelbre...@etion.co.za<file:///C:/Users/adm_rudolph/Desktop/em...@etion.co.za>
T. +27 12 678 9740 (ext. 9879)
M. +27 63 148 3376

76 Regency Drive | Irene | Centurion | 0157<https://goo.gl/maps/v9ZbwjqpPyL2>
www.etion.co.za<https://www.parsec.co.za/>


[cid:image001.jpg@01D67642.9B5709A0]<https://www.parsec.co.za/>


Facebook<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr> | 
YouTube<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A> | 
LinkedIn<https://www.linkedin.com/company/etionltd> | 
Twitter<https://twitter.com/Etionlimited> | 
Instagram<https://www.instagram.com/Etionlimited/>


From: Bill Bejeck <b...@confluent.io>
Sent: Wednesday, 19 August 2020 3:53 PM
To: users@kafka.apache.org
Subject: Re: Kafka Streams Key-value store question

Hi Pirow,

If I'm understanding your requirements correctly, I think using a global
store
<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier-<https://kafka.apache.org/26/javadoc/org/apache/kafka/streams/StreamsBuilder.html#addGlobalStore-org.apache.kafka.streams.state.StoreBuilder-java.lang.String-org.apache.kafka.streams.kstream.Consumed-org.apache.kafka.streams.processor.ProcessorSupplier->>
will
work for you.

HTH,
Bill

On Wed, Aug 19, 2020 at 8:53 AM Pirow Engelbrecht <
pirow.engelbre...@etion.co.za<mailto:pirow.engelbre...@etion.co.za>> wrote:

> Hello,
>
>
>
> We’re building a JSON decorator using Kafka Streams’ processing API.
>
>
>
> The process is briefly that a piece of JSON should be consumed from an
> input topic (keys are null, value is the JSON). The JSON contains a field
> (e.g. “thisField”) with a value (e.g. “someLink”) . This value (and a
> timestamp) is used to look-up another piece JSON from a key-value topic
> (keys are all the different values of “thisField”, values are JSON). This
> key-value topic is created by another service in Kafka. This additional
> piece of JSON then gets appended to the input JSON and the result gets
> written to an output topic (keys are null, value is now the original JSON +
> lookup JSON).
>
>
>
> To do the query against a key-value store, ideally I want Kafka Streams to
> directly create and update a window key-value store in memory (or disk)
> from my key-value topic in Kafka, but I am unable to find a way to specify
> this through the StoreBuilder interface. Does anybody know how to do this?
>
> Here is my current Storebuilder code snippet:
>
> StoreBuilder<WindowStore<String, String>> storeBuilder = Stores.
> windowStoreBuilder(
>
> Stores.persistentWindowStore("loopkupStore",
> Duration.ofDays(14600), Duration.ofDays(14600), false),
>
> Serdes.String(),
>
> Serdes.String());
>
> storeBuilder.build();
>
>
>
>
>
> Currently my workaround is to have a sink for the key-value store and then
> create/update this key-value store using a node in the processing topology,
> but this has issues when restarting the service, i.e. when the service is
> restarted, the key-value store topic needs to be consumed from the start to
> rebuild the store in memory, but the sink would have written commit offsets
> which prevents the topic to be consumed from the start. I also cannot use
> streams.cleanUp() as this will reset all the sinks in my topology (y other
> sink ingests records from the input topic).
>
>
>
> Thanks
>
>
>
> *Pirow Engelbrecht*
> System Engineer
>
> *E.* pirow.engelbre...@etion.co.za<mailto:pirow.engelbre...@etion.co.za>
> *T.* +27 12 678 9740 (ext. 9879)
> *M.* +27 63 148 3376
>
> 76 Regency Drive | Irene | Centurion | 0157
> <https://goo.gl/maps/v9ZbwjqpPyL2<https://goo.gl/maps/v9ZbwjqpPyL2>>
> *www.etion.co.za<http://www.etion.co.za> 
> <https://www.parsec.co.za/<https://www.parsec.co.za>>*
>
> <https://www.parsec.co.za/<https://www.parsec.co.za/>>
>
> Facebook
> <https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr<https://www.facebook.com/Etion-Limited-2194612947433812?_rdc=1&_rdr>>
>  |
> YouTube 
> <https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A<https://www.youtube.com/channel/UCUY-5oeACtLk2uTsEjZCU6A>>
>  |
> LinkedIn 
> <https://www.linkedin.com/company/etionltd<https://www.linkedin.com/company/etionltd>>
>  | Twitter
> <https://twitter.com/Etionlimited<https://twitter.com/Etionlimited>> | 
> Instagram
> <https://www.instagram.com/Etionlimited/<https://www.instagram.com/Etionlimited>>
>
>
>

Reply via email to