I updated my consumer to that build. The memory issue seems to have abated.
TY!

Have started seeing this exception semi-regularly though:

ERROR  o.a.k.c.c.i.ConsumerCoordinator - User provided listener
org.apache.kafka.streams.processor.internals.StreamThread$1 for group
MinuteAgg failed on partition assignment

java.lang.IllegalStateException: task [1_4] Log end offset should not
change while restoring

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restoreActiveState(ProcessorStateManager.java:245)

        at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.register(ProcessorStateManager.java:198)

        at
org.apache.kafka.streams.processor.internals.ProcessorContextImpl.register(ProcessorContextImpl.java:123)

        at
org.apache.kafka.streams.state.internals.RocksDBWindowStore.init(RocksDBWindowStore.java:206)

        at
org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:66)

        at
org.apache.kafka.streams.state.internals.CachingWindowStore.init(CachingWindowStore.java:64)

        at
org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:81)

        at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:120)

        at
org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:633)

        at
org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:660)

        at
org.apache.kafka.streams.processor.internals.StreamThread.access$100(StreamThread.java:69)

        at
org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:124)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:228)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:313)

        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)

        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)

        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)

        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:407)

        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

On Fri, Dec 9, 2016 at 4:29 PM, Jon Yeargers <jon.yearg...@cedexis.com>
wrote:

> Perhaps that's the problem. Yes - I'm still using 0.10.1.0.
>
> Does this involve a broker update?
>
> On Fri, Dec 9, 2016 at 7:12 AM, Damian Guy <damian....@gmail.com> wrote:
>
>> Hi Jon,
>>
>> Are you using 0.10.1? There is a resource leak to do with the Window
>> Iterator. The bug has been fixed on the 0.10.1 branch (which will soon be
>> released as 0.10.1.1)
>> and it is also fixed in the confluent fork.
>>
>> You can get the confluent version by using the following:
>>
>> <repositories>
>>     <repository>
>>         <id>confluent</id>
>>         <url>http://packages.confluent.io/maven/</url>
>>     </repository></repositories>
>>
>> <dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-streams</artifactId>
>>     <version>0.10.1.0-cp2</version></dependency><dependency>
>>     <groupId>org.apache.kafka</groupId>
>>     <artifactId>kafka-clients</artifactId>
>>     <version>0.10.1.0-cp2</version></dependency>
>>
>>
>> On Thu, 8 Dec 2016 at 23:37 Jon Yeargers <jon.yearg...@cedexis.com>
>> wrote:
>>
>> I working with JSON data that has an array member. Im aggregating values
>> into this using minute long windows.
>>
>> I ran the app for ~10 minutes and watched it consume 40% of the memory on
>> a
>> box with 32G. It was still growing when I stopped it. At this point it had
>> created ~800 values each of which was < 1Mb in size (owing to the
>> limitations on message size set at the broker). (I wrote all the values
>> into Redis so I could count them and check the aggregation).
>>
>> 1. Why is it consuming so much memory?
>> 2. Is there a strategy for controlling this growth?
>>
>> I get that it's keeping every window open in case a new value shows up.
>> Maybe some way to relax this using event time vs clock time?
>>
>
>

Reply via email to