To help out I made the project that reproduces this issue publicly available at https://github.com/Hartimer/kafka-stream-issue
On Thu, Jun 8, 2017 at 11:40 PM João Peixoto <joao.harti...@gmail.com> wrote: > I am now able to consistently reproduce this issue with a dummy project. > > 1. Set "max.poll.interval.ms" to a low value > 2. Have the pipeline take longer than the interval above > 3. Profit > > This happens every single time and never recovers. > I simulated the delay by adding a breakpoint on my IDE on a sink "foreach" > step and then proceeding after the above interval had elapsed. > > Any advice on how to work around this using 0.10.2.1 would be greatly > appreciated. > Hope it helps > > On Wed, Jun 7, 2017 at 10:19 PM João Peixoto <joao.harti...@gmail.com> > wrote: > >> But my stream definition does not have a state store at all, Rocksdb or >> in memory... That's the most concerning part... >> On Wed, Jun 7, 2017 at 9:48 PM Sachin Mittal <sjmit...@gmail.com> wrote: >> >>> One instance with 10 threads may cause rocksdb issues. >>> What is the RAM you have? >>> >>> Also check CPU wait time. Many rocks db instances on one machine (depends >>> upon number of partitions) may cause lot of disk i/o causing wait times >>> to >>> increase and hence slowing down the message processing causing frequent >>> rebalance's. >>> >>> Also what is your topic partitions. My experience is having one thread >>> per >>> partition is ideal. >>> >>> Thanks >>> Sachin >>> >>> >>> On Thu, Jun 8, 2017 at 9:58 AM, João Peixoto <joao.harti...@gmail.com> >>> wrote: >>> >>> > There is one instance with 10 threads. >>> > >>> > On Wed, Jun 7, 2017 at 9:07 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> > >>> > > João, >>> > > >>> > > Do you also have multiple running instances in parallel, and how many >>> > > threads are your running within each instance? >>> > > >>> > > Guozhang >>> > > >>> > > >>> > > On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto < >>> joao.harti...@gmail.com> >>> > > wrote: >>> > > >>> > > > Eno before I do so I just want to be sure this would not be a >>> > duplicate. >>> > > I >>> > > > just found the following issues: >>> > > > >>> > > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as >>> being >>> > > fixed >>> > > > on 0.11.0.0/0.10.2.2 (both not released afaik) >>> > > > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in >>> > > progress >>> > > > >>> > > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska < >>> eno.there...@gmail.com> >>> > > > wrote: >>> > > > >>> > > > > Hi there, >>> > > > > >>> > > > > This might be a bug, would you mind opening a JIRA (copy-pasting >>> > below >>> > > is >>> > > > > sufficient). >>> > > > > >>> > > > > Thanks >>> > > > > Eno >>> > > > > > On 7 Jun 2017, at 21:38, João Peixoto <joao.harti...@gmail.com >>> > >>> > > wrote: >>> > > > > > >>> > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error >>> > > > > > >>> > > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] >>> > > > > > o.a.k.s.p.internals.StreamThread : Could not create >>> task >>> > > 0_31. >>> > > > > Will >>> > > > > > retry. >>> > > > > > >>> > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] >>> Failed >>> > to >>> > > > lock >>> > > > > > the state directory for task 0_31 >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > > > ProcessorStateManager.<init>(ProcessorStateManager.java:100) >>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>( >>> > > > AbstractTask.java:73) >>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > > > StreamTask.<init>(StreamTask.java:108) >>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > > > StreamThread.createStreamTask(StreamThread.java:864) >>> > > > > > [kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > StreamThread$TaskCreator. >>> > > > createTask(StreamThread.java:1237) >>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$ >>> > > > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) >>> > > > > > ~[kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > > > StreamThread.addStreamTasks(StreamThread.java:967) >>> > > > > > [kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > StreamThread.access$600( >>> > > > StreamThread.java:69) >>> > > > > > [kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. >>> > > > onPartitionsAssigned(StreamThread.java:234) >>> > > > > > [kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. >>> > > > onJoinComplete(ConsumerCoordinator.java:259) >>> > > > > > [kafka-clients-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >>> > > > joinGroupIfNeeded(AbstractCoordinator.java:352) >>> > > > > > [kafka-clients-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. >>> > > > ensureActiveGroup(AbstractCoordinator.java:303) >>> > > > > > [kafka-clients-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.clients.consumer.internals. >>> > ConsumerCoordinator.poll( >>> > > > ConsumerCoordinator.java:290) >>> > > > > > [kafka-clients-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer. >>> > > > pollOnce(KafkaConsumer.java:1029) >>> > > > > > [kafka-clients-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( >>> > > > KafkaConsumer.java:995) >>> > > > > > [kafka-clients-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop( >>> > > > StreamThread.java:592) >>> > > > > > [kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > at >>> > > > > > >>> > > > > org.apache.kafka.streams.processor.internals. >>> > > > StreamThread.run(StreamThread.java:361) >>> > > > > > [kafka-streams-0.10.2.1.jar!/:na] >>> > > > > > >>> > > > > > >>> > > > > > It has been printing it for hours now, so it does not recover >>> at >>> > all. >>> > > > > > The most worrying thing is that this stream definition does not >>> > even >>> > > > use >>> > > > > > state stores, it literally looks like this: >>> > > > > > >>> > > > > > KStreamBuilder builder = new KStreamBuilder(); >>> > > > > > KStream<byte[], Message> kStream = >>> > > > > > builder.stream(appOptions.getInput().getTopic()); >>> > > > > > kStream.process(() -> processor); >>> > > > > > new KafkaStreams(builder, streamsConfiguration); >>> > > > > > >>> > > > > > The "processor" does its thing and calls "context().commit()" >>> when >>> > > > done. >>> > > > > > That's it. Looking at the actual machine running the instance, >>> the >>> > > > > folders >>> > > > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file. >>> > > > > > >>> > > > > > This seems to have been bootstrapped by the exception: >>> > > > > > >>> > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit >>> > > > cannot be >>> > > > > > completed since the group has already rebalanced and assigned >>> the >>> > > > > > partitions to another member. This means that the time between >>> > > > subsequent >>> > > > > > calls to poll() was longer than the configured >>> > max.poll.interval.ms, >>> > > > > which >>> > > > > > typically implies that the poll loop is spending too much time >>> > > message >>> > > > > > processing. You can address this either by increasing the >>> session >>> > > > timeout >>> > > > > > or by reducing the maximum size of batches returned in poll() >>> with >>> > > > > > max.poll.records. >>> > > > > > >>> > > > > > We are addressing the latter by reducing "max.poll.records" and >>> > > > > increasing " >>> > > > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not >>> > worry >>> > > > > about >>> > > > > > state dirs if there are no state stores? Since it doesn't seem >>> to >>> > do >>> > > so >>> > > > > > automatically, can I configured it somehow to achieve this end? >>> > > > > > >>> > > > > > Additionally, what could lead to it not being able to recover? >>> > > > > > >>> > > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax < >>> > > matth...@confluent.io >>> > > > > >>> > > > > > wrote: >>> > > > > > >>> > > > > >> Great! :) >>> > > > > >> >>> > > > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote: >>> > > > > >>> I see now that my Kafka cluster is very stable, and these >>> errors >>> > > dont >>> > > > > >> come >>> > > > > >>> now. >>> > > > > >>> >>> > > > > >>> -Sameer. >>> > > > > >>> >>> > > > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar < >>> > > sam.kum.w...@gmail.com >>> > > > > >>> > > > > >> wrote: >>> > > > > >>> >>> > > > > >>>> Yes, I have upgraded my cluster and client both to version >>> > 10.2.1 >>> > > > and >>> > > > > >>>> currently monitoring the situation. >>> > > > > >>>> Will report back in case I find any errors. Thanks for the >>> help >>> > > > > though. >>> > > > > >>>> >>> > > > > >>>> -Sameer. >>> > > > > >>>> >>> > > > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax < >>> > > > > matth...@confluent.io> >>> > > > > >>>> wrote: >>> > > > > >>>> >>> > > > > >>>>> Did you see Eno's reply? >>> > > > > >>>>> >>> > > > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed >>> there. >>> > If >>> > > > > not, >>> > > > > >>>>> please report back. >>> > > > > >>>>> >>> > > > > >>>>> I would also recommend to subscribe to the list. It's >>> > > self-service >>> > > > > >>>>> http://kafka.apache.org/contact >>> > > > > >>>>> >>> > > > > >>>>> >>> > > > > >>>>> -Matthias >>> > > > > >>>>> >>> > > > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote: >>> > > > > >>>>>> My brokers are on version 10.1.0 and my clients are on >>> version >>> > > > > 10.2.0. >>> > > > > >>>>>> Also, do a reply to all, I am currently not subscribed to >>> the >>> > > > list. >>> > > > > >>>>>> >>> > > > > >>>>>> -Sameer. >>> > > > > >>>>>> >>> > > > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar < >>> > > > > sam.kum.w...@gmail.com> >>> > > > > >>>>> wrote: >>> > > > > >>>>>> >>> > > > > >>>>>>> Hi, >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> I ran two nodes in my streams compute cluster, they were >>> > > running >>> > > > > fine >>> > > > > >>>>> for >>> > > > > >>>>>>> few hours before outputting with failure to rebalance >>> errors. >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> I couldnt understand why this happened but I saw one >>> strange >>> > > > > >>>>> behaviour... >>> > > > > >>>>>>> >>> > > > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state >>> directory" >>> > > > > error, >>> > > > > >>>>> this >>> > > > > >>>>>>> might have caused the partitions to relocate and hence >>> the >>> > > error. >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> I am attaching detailed logs for both the nodes, please >>> see >>> > if >>> > > > you >>> > > > > >> can >>> > > > > >>>>>>> help. >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> Some of the logs for quick reference are these. >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception >>> caught >>> > in >>> > > > > thread >>> > > > > >>>>>>> StreamThread-2 >>> > > > > >>>>>>> >>> > > > > >>>>>>> org.apache.kafka.streams.errors.StreamsException: >>> > stream-thread >>> > > > > >>>>>>> [StreamThread-2] Failed to rebalance >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.runLoop(StreamThread.java:612) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368) >>> > > > > >>>>>>> >>> > > > > >>>>>>> Caused by: >>> org.apache.kafka.streams.errors.StreamsException: >>> > > > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream >>> tasks >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea >>> > > > > >>>>> d.java:488) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.access$1200(StreamThread.java: >>> > 69) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre >>> > > > > >>>>> ad.java:259) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> >>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor >>> > > > > >>>>>>> dinator.java:396) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> >>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract >>> > > > > >>>>>>> Coordinator.java:329) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> >>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract >>> > > > > >>>>>>> Coordinator.java:303) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> >>> > > > > >>> mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.runLoop(StreamThread.java:582) >>> > > > > >>>>>>> >>> > > > > >>>>>>> ... 1 more >>> > > > > >>>>>>> >>> > > > > >>>>>>> Caused by: org.apache.kafka.clients.consumer. >>> > > > CommitFailedException: >>> > > > > >>>>>>> Commit cannot be completed since the group has already >>> > > rebalanced >>> > > > > and >>> > > > > >>>>>>> assigned the partitions to another member. This means >>> that >>> > the >>> > > > time >>> > > > > >>>>> between >>> > > > > >>>>>>> subsequent calls to poll() was longer than the configured >>> > > > > >>>>>>> max.poll.interval.ms, which typically implies that the >>> poll >>> > > loop >>> > > > > is >>> > > > > >>>>>>> spending too much time message processing. You can >>> address >>> > this >>> > > > > >> either >>> > > > > >>>>> by >>> > > > > >>>>>>> increasing the session timeout or by reducing the maximum >>> > size >>> > > of >>> > > > > >>>>> batches >>> > > > > >>>>>>> returned in poll() with max.poll.records. >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> >>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co >>> > > > > >>>>>>> nsumerCoordinator.java:698) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> >>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer >>> > > > > >>>>>>> Coordinator.java:577) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.clients.consu >>> > > > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread$3.apply(StreamThread.java:535) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> > > > > >>> ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> > > ssor.internals.StreamThread.commitOffsets(StreamThread.java:531) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea >>> > > > > >>>>> d.java:480) >>> > > > > >>>>>>> >>> > > > > >>>>>>> ... 10 more >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> 2017-05-03 16:53:57 WARN StreamThread:1184 - Could not >>> > create >>> > > > task >>> > > > > >>>>> 1_38. >>> > > > > >>>>>>> Will retry. >>> > > > > >>>>>>> >>> > > > > >>>>>>> org.apache.kafka.streams.errors.LockException: task >>> [1_38] >>> > > > Failed >>> > > > > to >>> > > > > >>>>> lock >>> > > > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38 >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa >>> > > > > >>>>>>> nager.java:102) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.createStreamTask(StreamThread. >>> > > > java:834) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr >>> > > > > >>>>> ead.java:1207) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac >>> > > > > >>>>>>> koff(StreamThread.java:1180) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread. >>> > > > java:937) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread.access$500(StreamThread.java:69) >>> > > > > >>>>>>> >>> > > > > >>>>>>> at org.apache.kafka.streams.proce >>> > > > > >>>>>>> >>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr >>> > > > > >>>>> ead.java:236) >>> > > > > >>>>>>> >>> > > > > >>>>>>> >>> > > > > >>>>>>> Regards, >>> > > > > >>>>>>> >>> > > > > >>>>>>> -Sameer. >>> > > > > >>>>>>> >>> > > > > >>>>>> >>> > > > > >>>>> >>> > > > > >>>>> >>> > > > > >>>> >>> > > > > >>> >>> > > > > >> >>> > > > > >> >>> > > > > >>> > > > > >>> > > > >>> > > >>> > > >>> > > >>> > > -- >>> > > -- Guozhang >>> > > >>> > >>> >>