Matthias, I tried with default store as well but getting same error, can you please check if I am initializing the global store in the right way:
public void setupGlobalCacheTables(String theKafkaServers) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers); StreamsBuilder streamsBuilder = new StreamsBuilder(); groupCacheTable = streamsBuilder.globalTable(GROUP_CACHE_TOPIC, Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()), Materialized.as(GROUP_CACHE_STORE_NAME)); Topology groupCacheTopology = streamsBuilder.build(); kafkaStreams = new KafkaStreams(groupCacheTopology, props); kafkaStreams.start(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOG.info("Stopping the stream"); kafkaStreams.close(); })); } On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com> wrote: > Hi Matthias, > > By the way, I used the in-memory global store and the service is giving > out of memory error during startup. Unfortunately i don't have a stack > trace now but when i got stack the first time, the error was coming > somewhere from memorypool.allocate or similar kind of method. If i get the > stack trace again, I will share that with you. > However, the topic from where the store is reading from is empty so I am > not sure why the global k table is trying to occupy a lot of space. The POD > memory request and limits are 500 MiB and 750 MiB respectively so the state > store should fit into this memory I believe since topic is empty. Can you > provide inputs on this. > > > On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com> > wrote: > >> Ok... got it... is there any hook that I can attach to the global k table >> or global store? What I mean here is I want to know when the global store >> is updated with data from topic in that case the hook that I specified >> should be invoked so i can do some activity like logging that, this will >> allow me to know how long the global store took to sync up with topic after >> the event has been put on the topic. >> >> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org> >> wrote: >> >>> For example it could be some "static" information, like a mapping from >>> zip code to city name. >>> >>> Something that does usually not change over time. >>> >>> >>> -Matthias >>> >>> On 5/25/20 9:55 PM, Pushkar Deole wrote: >>> > Matthias, >>> > >>> > I am wondering what you mean by "Global store hold "axially" data that >>> is >>> > provided from "outside" of the >>> > app" >>> > >>> > will you be able to give some example use case here as to what you >>> mean by >>> > axially data provided from outside app? >>> > >>> > On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org> >>> wrote: >>> > >>> >> Both stores sever a different purpose. >>> >> >>> >> Regular stores allow you to store state the application computes. >>> >> Writing into the changelog is a fault-tolerance mechanism. >>> >> >>> >> Global store hold "axially" data that is provided from "outside" of >>> the >>> >> app. There is no changelog topic, but only the input topic (that is >>> used >>> >> to re-create the global state). >>> >> >>> >> Local stores are sharded and updates are "sync" as they don't need to >>> be >>> >> shared with anybody else. >>> >> >>> >> For global stores, as all instances need to be updated, updates are >>> >> async (we don't know when which instance will update it's own global >>> >> store replica). >>> >> >>> >>>> Say one stream thread updates the topic for global store and starts >>> >>>> processing next event wherein the processor tries to read the global >>> >> store >>> >>>> which may not have been synced with the topic? >>> >> >>> >> Correct. There is no guarantee when the update to the global store >>> will >>> >> be applied. As said, global stores are not designed to hold data the >>> >> application computes. >>> >> >>> >> >>> >> -Matthias >>> >> >>> >> >>> >> On 4/30/20 11:11 PM, Pushkar Deole wrote: >>> >>> thanks... will try with GlobalKTable. >>> >>> As a side question, I didn't really understand the significance of >>> global >>> >>> state store which kind of works in a reverse way to local state store >>> >> i.e. >>> >>> local state store is updated and then saved to changelog topic >>> whereas in >>> >>> case of global state store the topic is updated first and then >>> synced to >>> >>> global state store. Do these two work in sync i.e. the update to >>> topic >>> >> and >>> >>> global state store ? >>> >>> >>> >>> Say one stream thread updates the topic for global store and starts >>> >>> processing next event wherein the processor tries to read the global >>> >> store >>> >>> which may not have been synced with the topic? >>> >>> >>> >>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org> >>> wrote: >>> >>> >>> >>>> Yes. >>> >>>> >>> >>>> A `GlobalKTable` uses a global store internally. >>> >>>> >>> >>>> You can also use `StreamsBuilder.addGlobalStore()` or >>> >>>> `Topology.addGlobalStore()` to add a global store "manually". >>> >>>> >>> >>>> >>> >>>> -Matthias >>> >>>> >>> >>>> >>> >>>> On 4/30/20 7:42 AM, Pushkar Deole wrote: >>> >>>>> Thanks Matthias. >>> >>>>> Can you elaborate on the replicated caching layer part? >>> >>>>> When you say global stores, do you mean GlobalKTable created from a >>> >> topic >>> >>>>> e.g. using StreamsBuilder.globalTable(String topic) method ? >>> >>>>> >>> >>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org >>> > >>> >>>> wrote: >>> >>>>> >>> >>>>>> It's not possible to modify state store from "outside". >>> >>>>>> >>> >>>>>> If you want to build a "replicated caching layer", you could use >>> >> global >>> >>>>>> stores and write into the corresponding topics to update all >>> stores. >>> >> Of >>> >>>>>> course, those updates would be async. >>> >>>>>> >>> >>>>>> >>> >>>>>> -Matthias >>> >>>>>> >>> >>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote: >>> >>>>>>> Hi All, >>> >>>>>>> >>> >>>>>>> I am wondering if this is possible: i have been asked to use >>> state >>> >>>> stores >>> >>>>>>> as a general replicated cache among multiple instances of service >>> >>>>>> instances >>> >>>>>>> however the state store is created through streambuilder but is >>> not >>> >>>>>>> actually modified through stream processor topology however it >>> is to >>> >> be >>> >>>>>>> modified from outside the stream topology. So, essentially, the >>> state >>> >>>>>> store >>> >>>>>>> is just to be created from streambuilder and then to be used as >>> an >>> >>>>>>> application level cache that will get replicated between >>> application >>> >>>>>>> instances. Is this possible using state stores? >>> >>>>>>> >>> >>>>>>> Secondly, if possible, is this a good design approach? >>> >>>>>>> >>> >>>>>>> Appreciate your response since I don't know the internals of >>> state >>> >>>>>> stores. >>> >>>>>>> >>> >>>>>> >>> >>>>>> >>> >>>>> >>> >>>> >>> >>>> >>> >>> >>> >> >>> >> >>> > >>> >>>