Thanks, that comment actually mad its way to the documentation already.
Apparently none of that was related. It was a leak - I was not closing an
iterator that was returned by
https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/state/ReadOnlyWindowStore.html#fetch(K,%20long,%20long)
Methods javadoc does not mention that the iterator needs to be closed.
Neither does
https://docs.confluent.io/current/streams/developer-guide.html#querying-local-window-stores
.
Unfortunately I have not read iterators javadoc earlier which does mention
it.
I have a feeling that it would be helpful to add this to documentation
examples as well as javadocs for all methods  that do return iterators.

Best regards,
Stanislav.



2017-09-27 21:53 GMT+02:00 Ted Yu <yuzhih...@gmail.com>:

> Have you seen this comment ?
>
> https://issues.apache.org/jira/browse/KAFKA-5122?
> focusedCommentId=15984467&page=com.atlassian.jira.
> plugin.system.issuetabpanels:comment-tabpanel#comment-15984467
>
> On Wed, Sep 27, 2017 at 12:44 PM, Stas Chizhov <schiz...@gmail.com> wrote:
>
> > Hi,
> >
> > I am running a simple kafka streams app (0.11.0.1) that counts messages
> per
> > hour per partition. The app runs in a docker container with a memory
> limit
> > set, which is always reached by the app within few minutes and then
> > container is killed. After running it with various number of instances,
> > different memory limits and in-memory store instead - it looks like it is
> > off-heap memory being taken by rocks db. After playing with different
> > memory limits it looks like rocksdb assumes it can grab all the physical
> > memory of the machine, so if the container limit is less than it gets
> > killed on the way.
> >
> > Also I have not changed any rocksdb config settings, but the defaults
> > mentioned here:
> > https://docs.confluent.io/current/streams/developer-
> > guide.html#streams-developer-guide-rocksdb-config
> > looks nowhere close to the consumption observed.
> >
> >
> > Few details about the app:
> > I use windowed store defined as follows:
> >
> > StateStoreSupplier windowCounts = Stores.create(WINDOW_COUNT_STORE)
> >    .withIntegerKeys()
> >    .withLongValues()
> >    .persistent()
> >    .enableCaching()
> >    .windowed(MINUTES.toMillis(1), HOURS.toMillis(5), 10, false)
> >    .build();
> >
> > There is a processor that updates a count for a partition for a timestamp
> > that is rounded to an hour boundary:
> >
> > store.put(
> >    context.partition(),
> >    current(floor(value.getIngressTime())) + 1,
> >    floor(value.getIngressTime())
> > );
> >
> >
> > Any hints on what might cause this or any config settings?
> >
> > Thank you,
> > Stanislav.
> >
>

Reply via email to