Thanks, that comment actually mad its way to the documentation already. Apparently none of that was related. It was a leak - I was not closing an iterator that was returned by https://kafka.apache.org/0110/javadoc/org/apache/kafka/streams/state/ReadOnlyWindowStore.html#fetch(K,%20long,%20long) Methods javadoc does not mention that the iterator needs to be closed. Neither does https://docs.confluent.io/current/streams/developer-guide.html#querying-local-window-stores . Unfortunately I have not read iterators javadoc earlier which does mention it. I have a feeling that it would be helpful to add this to documentation examples as well as javadocs for all methods that do return iterators.
Best regards, Stanislav. 2017-09-27 21:53 GMT+02:00 Ted Yu <yuzhih...@gmail.com>: > Have you seen this comment ? > > https://issues.apache.org/jira/browse/KAFKA-5122? > focusedCommentId=15984467&page=com.atlassian.jira. > plugin.system.issuetabpanels:comment-tabpanel#comment-15984467 > > On Wed, Sep 27, 2017 at 12:44 PM, Stas Chizhov <schiz...@gmail.com> wrote: > > > Hi, > > > > I am running a simple kafka streams app (0.11.0.1) that counts messages > per > > hour per partition. The app runs in a docker container with a memory > limit > > set, which is always reached by the app within few minutes and then > > container is killed. After running it with various number of instances, > > different memory limits and in-memory store instead - it looks like it is > > off-heap memory being taken by rocks db. After playing with different > > memory limits it looks like rocksdb assumes it can grab all the physical > > memory of the machine, so if the container limit is less than it gets > > killed on the way. > > > > Also I have not changed any rocksdb config settings, but the defaults > > mentioned here: > > https://docs.confluent.io/current/streams/developer- > > guide.html#streams-developer-guide-rocksdb-config > > looks nowhere close to the consumption observed. > > > > > > Few details about the app: > > I use windowed store defined as follows: > > > > StateStoreSupplier windowCounts = Stores.create(WINDOW_COUNT_STORE) > > .withIntegerKeys() > > .withLongValues() > > .persistent() > > .enableCaching() > > .windowed(MINUTES.toMillis(1), HOURS.toMillis(5), 10, false) > > .build(); > > > > There is a processor that updates a count for a partition for a timestamp > > that is rounded to an hour boundary: > > > > store.put( > > context.partition(), > > current(floor(value.getIngressTime())) + 1, > > floor(value.getIngressTime()) > > ); > > > > > > Any hints on what might cause this or any config settings? > > > > Thank you, > > Stanislav. > > >