Hi Guozhang,

Please find below.  I have tried with the latest 2.0.0 libraries and no
improvement observed.


Kafka version - 1.0.1
Total Memory allocated - 24 GB
Max Stream Cache - 8GB

---------------------------------------
Processor class code:

private KeyValueStore<String, HourlyUsage> hourlyStore = null; // Local
store
private KeyValueStore<String, Integer> hourlyProcessedStore = null; //
Local store

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.hourlyStore = (KeyValueStore<String, HourlyUsage>)
context.getStateStore("kvshourly"); // Stores the hourly JSON payload
    this.hourlyProcessedStore = (KeyValueStore<String, Integer>)
context.getStateStore("kvshourlyprocessed"); // Stores just the key sent to
downstream

    this.context.schedule(punctuateMs, PunctuationType.WALL_CLOCK_TIME, new
Punctuator() {
        public void punctuate(long timestamp) {
            handleTasks();
        }
    });
}

@Override
public void process(String key, HourlyUsage newVal) {
    if (hourlyProcessedStore.get(key) == null) {
        currentVal = hourlyStore.get(key);

        if (currentVal != null) {
            currentVal.flattenRecord(newVal);
            hourlyStore.put(key, currentVal);

            if (currentVal.hourlyCompleted()) {
                context.forward(key, currentVal, "materializehourly");
                hourlyProcessedStore.put(key, 0);
            }
            currentVal = null;
        }
        else {
            hourlyStore.put(key, newVal);
        }
    }
}
---------------------------------------

Thanks,
Ashok

On Fri, Aug 17, 2018 at 3:11 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello AshokKumar,
>
> Which version of Kafka are you using? And could you share your code snippet
> for us to help investigate the issue (you can omit any concrete logic that
> involves your business logic, just the sketch of the code is fine).
>
>
> Guozhang
>
> On Fri, Aug 17, 2018 at 8:52 AM, AshokKumar J <ashokkumar...@gmail.com>
> wrote:
>
> > Hi,
> > Any thoughts on the below issue?  I think the behavior should be
> > reproducible if we perform both the put, get from the store (cache
> > enabled), when processing each record from the topic, with processing
> > volume of 2-3 million records each 15 mins, each JSON on an average
> having
> > 400 to 500 KB approx.  Overtime the app runs out of the total memory
> within
> > 24 hours.
> >
> > Thanks,
> > Ashok
> >
> > On Wed, Aug 15, 2018 at 5:15 AM, AshokKumar J <ashokkumar...@gmail.com>
> > wrote:
> >
> > > Disabling the stream cache prevents the unbounded memory usage, however
> > > the throughput is low (with ROCKSDB cache enabled).  Can you please
> > advise
> > > why the cache objects reference doesn't get released in time (for GC
> > > cleanup) and grows continuously?
> > >
> > > On Tue, Aug 14, 2018 at 11:17 PM, AshokKumar J <
> ashokkumar...@gmail.com>
> > > wrote:
> > >
> > >> Hi,
> > >>
> > >> we have a stream application that uses the low level API.  We persist
> > the
> > >> data into the key value state store.  For each record that we retrieve
> > from
> > >> the topic we perform a lookup against the store to see if it exists,
> if
> > it
> > >> does then we update the existing, else we simply add the new record.
> > With
> > >> this we are running into significant memory issue, basically whatever
> > the
> > >> memory we allocate they all get fully utilized (all the objects goes
> > into
> > >> the older generations).  The caching has been enabled and we specified
> > 40%
> > >> of the total memory to the caching.  Let's say we have total
> application
> > >> memory as 24GB and we specify the caching size as 12GB, ideally we
> > expect
> > >> 12GB to reside in older generation and rest should be younger, but for
> > some
> > >> reason everything is going into older generation and eventually we are
> > >> running out of memory within a day.  Please see below objects
> dominator
> > >> tree. Kindly suggest
> > >>
> > >> https://files.slack.com/files-pri/T47H7EWH0-FC8EZ9L66/image.png
> > >>
> > >>
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to