Matthias,
I tried with default store as well but getting same error, can you please
check if I am initializing the global store in the right way:

public void setupGlobalCacheTables(String theKafkaServers) {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, DEFAULT_APPLICATION_ID);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, theKafkaServers);
    StreamsBuilder streamsBuilder = new StreamsBuilder();
    groupCacheTable =
    streamsBuilder.globalTable(GROUP_CACHE_TOPIC,
Consumed.with(Serdes.String(), GroupCacheSerdes.groupCache()),
        Materialized.as(GROUP_CACHE_STORE_NAME));
    Topology groupCacheTopology = streamsBuilder.build();
     kafkaStreams = new KafkaStreams(groupCacheTopology, props);
      kafkaStreams.start();

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Stopping the stream");
kafkaStreams.close();
}));
}

On Wed, May 27, 2020 at 5:06 PM Pushkar Deole <pdeole2...@gmail.com> wrote:

> Hi Matthias,
>
> By the way, I used the in-memory global store and the service is giving
> out of memory error during startup. Unfortunately i don't have a stack
> trace now but when i got stack the first time, the error was coming
> somewhere from memorypool.allocate or similar kind of method. If i get the
> stack trace again, I will share that with you.
> However, the topic from where the store is reading from is empty so I am
> not sure why the global k table is trying to occupy a lot of space. The POD
> memory request and limits are 500 MiB and 750 MiB respectively so the state
> store should fit into this memory I believe since topic is empty. Can you
> provide inputs on this.
>
>
> On Wed, May 27, 2020 at 2:17 PM Pushkar Deole <pdeole2...@gmail.com>
> wrote:
>
>> Ok... got it... is there any hook that I can attach to the global k table
>> or global store? What I mean here is I want to know when the global store
>> is updated with data from topic in that case the hook that I specified
>> should be invoked so i can do some activity like logging that, this will
>> allow me to know how long the global store took to sync up with topic after
>> the event has been put on the topic.
>>
>> On Tue, May 26, 2020 at 10:58 PM Matthias J. Sax <mj...@apache.org>
>> wrote:
>>
>>> For example it could be some "static" information, like a mapping from
>>> zip code to city name.
>>>
>>> Something that does usually not change over time.
>>>
>>>
>>> -Matthias
>>>
>>> On 5/25/20 9:55 PM, Pushkar Deole wrote:
>>> > Matthias,
>>> >
>>> > I am wondering what you mean by "Global store hold "axially" data that
>>> is
>>> > provided from "outside" of the
>>> > app"
>>> >
>>> > will you be able to give some example use case here as to what you
>>> mean by
>>> > axially data provided from outside app?
>>> >
>>> > On Sat, May 2, 2020 at 1:58 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >
>>> >> Both stores sever a different purpose.
>>> >>
>>> >> Regular stores allow you to store state the application computes.
>>> >> Writing into the changelog is a fault-tolerance mechanism.
>>> >>
>>> >> Global store hold "axially" data that is provided from "outside" of
>>> the
>>> >> app. There is no changelog topic, but only the input topic (that is
>>> used
>>> >> to re-create the global state).
>>> >>
>>> >> Local stores are sharded and updates are "sync" as they don't need to
>>> be
>>> >> shared with anybody else.
>>> >>
>>> >> For global stores, as all instances need to be updated, updates are
>>> >> async (we don't know when which instance will update it's own global
>>> >> store replica).
>>> >>
>>> >>>> Say one stream thread updates the topic for global store and starts
>>> >>>> processing next event wherein the processor tries to read the global
>>> >> store
>>> >>>> which may not have been synced with the topic?
>>> >>
>>> >> Correct. There is no guarantee when the update to the global store
>>> will
>>> >> be applied. As said, global stores are not designed to hold data the
>>> >> application computes.
>>> >>
>>> >>
>>> >> -Matthias
>>> >>
>>> >>
>>> >> On 4/30/20 11:11 PM, Pushkar Deole wrote:
>>> >>> thanks... will try with GlobalKTable.
>>> >>> As a side question, I didn't really understand the significance of
>>> global
>>> >>> state store which kind of works in a reverse way to local state store
>>> >> i.e.
>>> >>> local state store is updated and then saved to changelog topic
>>> whereas in
>>> >>> case of global state store the topic is updated first and then
>>> synced to
>>> >>> global state store. Do these two work in sync i.e. the update to
>>> topic
>>> >> and
>>> >>> global state store ?
>>> >>>
>>> >>> Say one stream thread updates the topic for global store and starts
>>> >>> processing next event wherein the processor tries to read the global
>>> >> store
>>> >>> which may not have been synced with the topic?
>>> >>>
>>> >>> On Fri, May 1, 2020 at 3:35 AM Matthias J. Sax <mj...@apache.org>
>>> wrote:
>>> >>>
>>> >>>> Yes.
>>> >>>>
>>> >>>> A `GlobalKTable` uses a global store internally.
>>> >>>>
>>> >>>> You can also use `StreamsBuilder.addGlobalStore()` or
>>> >>>> `Topology.addGlobalStore()` to add a global store "manually".
>>> >>>>
>>> >>>>
>>> >>>> -Matthias
>>> >>>>
>>> >>>>
>>> >>>> On 4/30/20 7:42 AM, Pushkar Deole wrote:
>>> >>>>> Thanks Matthias.
>>> >>>>> Can you elaborate on the replicated caching layer part?
>>> >>>>> When you say global stores, do you mean GlobalKTable created from a
>>> >> topic
>>> >>>>> e.g. using StreamsBuilder.globalTable(String topic) method ?
>>> >>>>>
>>> >>>>> On Thu, Apr 30, 2020 at 12:44 PM Matthias J. Sax <mj...@apache.org
>>> >
>>> >>>> wrote:
>>> >>>>>
>>> >>>>>> It's not possible to modify state store from "outside".
>>> >>>>>>
>>> >>>>>> If you want to build a "replicated caching layer", you could use
>>> >> global
>>> >>>>>> stores and write into the corresponding topics to update all
>>> stores.
>>> >> Of
>>> >>>>>> course, those updates would be async.
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> -Matthias
>>> >>>>>>
>>> >>>>>> On 4/29/20 10:52 PM, Pushkar Deole wrote:
>>> >>>>>>> Hi All,
>>> >>>>>>>
>>> >>>>>>> I am wondering if this is possible: i have been asked to use
>>> state
>>> >>>> stores
>>> >>>>>>> as a general replicated cache among multiple instances of service
>>> >>>>>> instances
>>> >>>>>>> however the state store is created through streambuilder but is
>>> not
>>> >>>>>>> actually modified through stream processor topology however it
>>> is to
>>> >> be
>>> >>>>>>> modified from outside the stream topology. So, essentially, the
>>> state
>>> >>>>>> store
>>> >>>>>>> is just to be created from streambuilder and then to be used as
>>> an
>>> >>>>>>> application level cache that will get replicated between
>>> application
>>> >>>>>>> instances. Is this possible using state stores?
>>> >>>>>>>
>>> >>>>>>> Secondly, if possible, is this a good design approach?
>>> >>>>>>>
>>> >>>>>>> Appreciate your response since I don't know the internals of
>>> state
>>> >>>>>> stores.
>>> >>>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >>
>>> >
>>>
>>>

Reply via email to