Hi Guozhang, Thanks for the input. Yes, confirmed that enabling and overriding the Rocks DB config setter class (with default parameters) in parallel to Kafka streams cache goes to indefinite memory usage. After removing the override, the application memory usage is consistent within 24GB. Can this be added to the Kafka Streams FAQ?
Ashok On Wed, Aug 22, 2018 at 6:03 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Ashok, > > Your implementation looks okay to me: I did not know how "handleTasks" is > implemented, just that if you are iterating over the store, you'd need to > close the iterator after used it. > > One thing I suspect is that your memory usage combing the streams cache > plus rocksDB's own buffering may be simply running beyond 24GB. You can > take a look at this JIRA comment and see if it is similar to your scenario > (note your application is only using key-value stores, so should not have > the segmentation amplification factor): > > > https://issues.apache.org/jira/browse/KAFKA-5122?focusedCommentId=15984467&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15984467 > > > > Guozhang > > > On Sun, Aug 19, 2018 at 7:18 PM, AshokKumar J <ashokkumar...@gmail.com> > wrote: > > > Hi Guozhang, > > > > Please find below. I have tried with the latest 2.0.0 libraries and no > > improvement observed. > > > > > > Kafka version - 1.0.1 > > Total Memory allocated - 24 GB > > Max Stream Cache - 8GB > > > > --------------------------------------- > > Processor class code: > > > > private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local > > store > > private KeyValueStore<String, Integer> hourlyProcessedStore = null; // > > Local store > > > > @Override > > public void init(ProcessorContext context) { > > this.context = context; > > this.hourlyStore = (KeyValueStore<String, HourlyUsage>) > > context.getStateStore("kvshourly"); // Stores the hourly JSON payload > > this.hourlyProcessedStore = (KeyValueStore<String, Integer>) > > context.getStateStore("kvshourlyprocessed"); // Stores just the key sent > > to > > downstream > > > > this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_TIME, > > new > > Punctuator() { > > public void punctuate(long timestamp) { > > handleTasks(); > > } > > }); > > } > > > > @Override > > public void process(String key, HourlyUsage newVal) { > > if (hourlyProcessedStore.get(key) == null) { > > currentVal = hourlyStore.get(key); > > > > if (currentVal != null) { > > currentVal.flattenRecord(newVal); > > hourlyStore.put(key, currentVal); > > > > if (currentVal.hourlyCompleted()) { > > context.forward(key, currentVal, "materializehourly"); > > hourlyProcessedStore.put(key, 0); > > } > > currentVal = null; > > } > > else { > > hourlyStore.put(key, newVal); > > } > > } > > } > > --------------------------------------- > > > > Thanks, > > Ashok > > > > On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello AshokKumar, > > > > > > Which version of Kafka are you using? And could you share your code > > snippet > > > for us to help investigate the issue (you can omit any concrete logic > > that > > > involves your business logic, just the sketch of the code is fine). > > > > > > > > > Guozhang > > > > > > On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <ashokkumar...@gmail.com > > > > > wrote: > > > > > > > Hi, > > > > Any thoughts on the below issue? I think the behavior should be > > > > reproducible if we perform both the put, get from the store (cache > > > > enabled), when processing each record from the topic, with processing > > > > volume of 2-3 million records each 15 mins, each JSON on an average > > > having > > > > 400 to 500 KB approx. Overtime the app runs out of the total memory > > > within > > > > 24 hours. > > > > > > > > Thanks, > > > > Ashok > > > > > > > > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J < > ashokkumar...@gmail.com > > > > > > > wrote: > > > > > > > > > Disabling the stream cache prevents the unbounded memory usage, > > however > > > > > the throughput is low (with ROCKSDB cache enabled). Can you please > > > > advise > > > > > why the cache objects reference doesn't get released in time (for > GC > > > > > cleanup) and grows continuously? > > > > > > > > > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J < > > > ashokkumar...@gmail.com> > > > > > wrote: > > > > > > > > > >> Hi, > > > > >> > > > > >> we have a stream application that uses the low level API. We > > persist > > > > the > > > > >> data into the key value state store. For each record that we > > retrieve > > > > from > > > > >> the topic we perform a lookup against the store to see if it > exists, > > > if > > > > it > > > > >> does then we update the existing, else we simply add the new > record. > > > > With > > > > >> this we are running into significant memory issue, basically > > whatever > > > > the > > > > >> memory we allocate they all get fully utilized (all the objects > goes > > > > into > > > > >> the older generations). The caching has been enabled and we > > specified > > > > 40% > > > > >> of the total memory to the caching. Let's say we have total > > > application > > > > >> memory as 24GB and we specify the caching size as 12GB, ideally we > > > > expect > > > > >> 12GB to reside in older generation and rest should be younger, but > > for > > > > some > > > > >> reason everything is going into older generation and eventually we > > are > > > > >> running out of memory within a day. Please see below objects > > > dominator > > > > >> tree. Kindly suggest > > > > >> > > > > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >