Hi Guozhang, Please find below. I have tried with the latest 2.0.0 libraries and no improvement observed.
Kafka version - 1.0.1 Total Memory allocated - 24 GB Max Stream Cache - 8GB --------------------------------------- Processor class code: private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local store private KeyValueStore<String, Integer> hourlyProcessedStore = null; // Local store @Override public void init(ProcessorContext context) { this.context = context; this.hourlyStore = (KeyValueStore<String, HourlyUsage>) context.getStateStore("kvshourly"); // Stores the hourly JSON payload this.hourlyProcessedStore = (KeyValueStore<String, Integer>) context.getStateStore("kvshourlyprocessed"); // Stores just the key sent to downstream this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_TIME, new Punctuator() { public void punctuate(long timestamp) { handleTasks(); } }); } @Override public void process(String key, HourlyUsage newVal) { if (hourlyProcessedStore.get(key) == null) { currentVal = hourlyStore.get(key); if (currentVal != null) { currentVal.flattenRecord(newVal); hourlyStore.put(key, currentVal); if (currentVal.hourlyCompleted()) { context.forward(key, currentVal, "materializehourly"); hourlyProcessedStore.put(key, 0); } currentVal = null; } else { hourlyStore.put(key, newVal); } } } --------------------------------------- Thanks, Ashok On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello AshokKumar, > > Which version of Kafka are you using? And could you share your code snippet > for us to help investigate the issue (you can omit any concrete logic that > involves your business logic, just the sketch of the code is fine). > > > Guozhang > > On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <ashokkumar...@gmail.com> > wrote: > > > Hi, > > Any thoughts on the below issue? I think the behavior should be > > reproducible if we perform both the put, get from the store (cache > > enabled), when processing each record from the topic, with processing > > volume of 2-3 million records each 15 mins, each JSON on an average > having > > 400 to 500 KB approx. Overtime the app runs out of the total memory > within > > 24 hours. > > > > Thanks, > > Ashok > > > > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <ashokkumar...@gmail.com> > > wrote: > > > > > Disabling the stream cache prevents the unbounded memory usage, however > > > the throughput is low (with ROCKSDB cache enabled). Can you please > > advise > > > why the cache objects reference doesn't get released in time (for GC > > > cleanup) and grows continuously? > > > > > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J < > ashokkumar...@gmail.com> > > > wrote: > > > > > >> Hi, > > >> > > >> we have a stream application that uses the low level API. We persist > > the > > >> data into the key value state store. For each record that we retrieve > > from > > >> the topic we perform a lookup against the store to see if it exists, > if > > it > > >> does then we update the existing, else we simply add the new record. > > With > > >> this we are running into significant memory issue, basically whatever > > the > > >> memory we allocate they all get fully utilized (all the objects goes > > into > > >> the older generations). The caching has been enabled and we specified > > 40% > > >> of the total memory to the caching. Let's say we have total > application > > >> memory as 24GB and we specify the caching size as 12GB, ideally we > > expect > > >> 12GB to reside in older generation and rest should be younger, but for > > some > > >> reason everything is going into older generation and eventually we are > > >> running out of memory within a day. Please see below objects > dominator > > >> tree. Kindly suggest > > >> > > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png > > >> > > >> > > > > > > > > > -- > -- Guozhang >