João, Do you also have multiple running instances in parallel, and how many threads are your running within each instance?
Guozhang On Wed, Jun 7, 2017 at 3:18 PM, João Peixoto <joao.harti...@gmail.com> wrote: > Eno before I do so I just want to be sure this would not be a duplicate. I > just found the following issues: > > * https://issues.apache.org/jira/browse/KAFKA-5167. Marked as being fixed > on 0.11.0.0/0.10.2.2 (both not released afaik) > * https://issues.apache.org/jira/browse/KAFKA-5070. Currently in progress > > On Wed, Jun 7, 2017 at 2:24 PM Eno Thereska <eno.there...@gmail.com> > wrote: > > > Hi there, > > > > This might be a bug, would you mind opening a JIRA (copy-pasting below is > > sufficient). > > > > Thanks > > Eno > > > On 7 Jun 2017, at 21:38, João Peixoto <joao.harti...@gmail.com> wrote: > > > > > > I'm using Kafka Streams 0.10.2.1 and I still see this error > > > > > > 2017-06-07 20:28:37.211 WARN 73 --- [ StreamThread-1] > > > o.a.k.s.p.internals.StreamThread : Could not create task 0_31. > > Will > > > retry. > > > > > > org.apache.kafka.streams.errors.LockException: task [0_31] Failed to > lock > > > the state directory for task 0_31 > > > at > > > > > org.apache.kafka.streams.processor.internals. > ProcessorStateManager.<init>(ProcessorStateManager.java:100) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.AbstractTask.<init>( > AbstractTask.java:73) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamTask.<init>(StreamTask.java:108) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.createStreamTask(StreamThread.java:864) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator. > createTask(StreamThread.java:1237) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread$ > AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > > > ~[kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.addStreamTasks(StreamThread.java:967) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.access$600( > StreamThread.java:69) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread$1. > onPartitionsAssigned(StreamThread.java:234) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator. > onJoinComplete(ConsumerCoordinator.java:259) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > joinGroupIfNeeded(AbstractCoordinator.java:352) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator. > ensureActiveGroup(AbstractCoordinator.java:303) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > ConsumerCoordinator.java:290) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer. > pollOnce(KafkaConsumer.java:1029) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:995) > > > [kafka-clients-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > StreamThread.java:592) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > at > > > > > org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:361) > > > [kafka-streams-0.10.2.1.jar!/:na] > > > > > > > > > It has been printing it for hours now, so it does not recover at all. > > > The most worrying thing is that this stream definition does not even > use > > > state stores, it literally looks like this: > > > > > > KStreamBuilder builder = new KStreamBuilder(); > > > KStream<byte[], Message> kStream = > > > builder.stream(appOptions.getInput().getTopic()); > > > kStream.process(() -> processor); > > > new KafkaStreams(builder, streamsConfiguration); > > > > > > The "processor" does its thing and calls "context().commit()" when > done. > > > That's it. Looking at the actual machine running the instance, the > > folders > > > under /tmp/kafka-streams/<stream name>/ only have a .lock file. > > > > > > This seems to have been bootstrapped by the exception: > > > > > > org.apache.kafka.clients.consumer.CommitFailedException: Commit > cannot be > > > completed since the group has already rebalanced and assigned the > > > partitions to another member. This means that the time between > subsequent > > > calls to poll() was longer than the configured max.poll.interval.ms, > > which > > > typically implies that the poll loop is spending too much time message > > > processing. You can address this either by increasing the session > timeout > > > or by reducing the maximum size of batches returned in poll() with > > > max.poll.records. > > > > > > We are addressing the latter by reducing "max.poll.records" and > > increasing " > > > commit.interval.ms", nonetheless, shouldn't Kafka Streams not worry > > about > > > state dirs if there are no state stores? Since it doesn't seem to do so > > > automatically, can I configured it somehow to achieve this end? > > > > > > Additionally, what could lead to it not being able to recover? > > > > > > On Tue, May 16, 2017 at 3:17 PM Matthias J. Sax <matth...@confluent.io > > > > > wrote: > > > > > >> Great! :) > > >> > > >> On 5/16/17 2:31 AM, Sameer Kumar wrote: > > >>> I see now that my Kafka cluster is very stable, and these errors dont > > >> come > > >>> now. > > >>> > > >>> -Sameer. > > >>> > > >>> On Fri, May 5, 2017 at 7:53 AM, Sameer Kumar <sam.kum.w...@gmail.com > > > > >> wrote: > > >>> > > >>>> Yes, I have upgraded my cluster and client both to version 10.2.1 > and > > >>>> currently monitoring the situation. > > >>>> Will report back in case I find any errors. Thanks for the help > > though. > > >>>> > > >>>> -Sameer. > > >>>> > > >>>> On Fri, May 5, 2017 at 3:37 AM, Matthias J. Sax < > > matth...@confluent.io> > > >>>> wrote: > > >>>> > > >>>>> Did you see Eno's reply? > > >>>>> > > >>>>> Please try out Streams 0.10.2.1 -- this should be fixed there. If > > not, > > >>>>> please report back. > > >>>>> > > >>>>> I would also recommend to subscribe to the list. It's self-service > > >>>>> http://kafka.apache.org/contact > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> On 5/3/17 10:49 PM, Sameer Kumar wrote: > > >>>>>> My brokers are on version 10.1.0 and my clients are on version > > 10.2.0. > > >>>>>> Also, do a reply to all, I am currently not subscribed to the > list. > > >>>>>> > > >>>>>> -Sameer. > > >>>>>> > > >>>>>> On Wed, May 3, 2017 at 6:34 PM, Sameer Kumar < > > sam.kum.w...@gmail.com> > > >>>>> wrote: > > >>>>>> > > >>>>>>> Hi, > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> I ran two nodes in my streams compute cluster, they were running > > fine > > >>>>> for > > >>>>>>> few hours before outputting with failure to rebalance errors. > > >>>>>>> > > >>>>>>> > > >>>>>>> I couldnt understand why this happened but I saw one strange > > >>>>> behaviour... > > >>>>>>> > > >>>>>>> at 16:53 on node1, I saw "Failed to lock the state directory" > > error, > > >>>>> this > > >>>>>>> might have caused the partitions to relocate and hence the error. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> I am attaching detailed logs for both the nodes, please see if > you > > >> can > > >>>>>>> help. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> Some of the logs for quick reference are these. > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> 2017-05-03 16:53:53 ERROR Kafka10Base:44 - Exception caught in > > thread > > >>>>>>> StreamThread-2 > > >>>>>>> > > >>>>>>> org.apache.kafka.streams.errors.StreamsException: stream-thread > > >>>>>>> [StreamThread-2] Failed to rebalance > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:612) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.run(StreamThread.java:368) > > >>>>>>> > > >>>>>>> Caused by: org.apache.kafka.streams.errors.StreamsException: > > >>>>>>> stream-thread [StreamThread-2] failed to suspend stream tasks > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea > > >>>>> d.java:488) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.access$1200(StreamThread.java:69) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsRevoked(StreamThre > > >>>>> ad.java:259) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoor > > >>>>>>> dinator.java:396) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.internals.AbstractCoordinator.joinGroupIfNeeded(Abstract > > >>>>>>> Coordinator.java:329) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.internals.AbstractCoordinator.ensureActiveGroup(Abstract > > >>>>>>> Coordinator.java:303) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> > > mer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.KafkaConsumer.poll(KafkaConsumer.java:995) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.runLoop(StreamThread.java:582) > > >>>>>>> > > >>>>>>> ... 1 more > > >>>>>>> > > >>>>>>> Caused by: org.apache.kafka.clients.consumer. > CommitFailedException: > > >>>>>>> Commit cannot be completed since the group has already rebalanced > > and > > >>>>>>> assigned the partitions to another member. This means that the > time > > >>>>> between > > >>>>>>> subsequent calls to poll() was longer than the configured > > >>>>>>> max.poll.interval.ms, which typically implies that the poll loop > > is > > >>>>>>> spending too much time message processing. You can address this > > >> either > > >>>>> by > > >>>>>>> increasing the session timeout or by reducing the maximum size of > > >>>>> batches > > >>>>>>> returned in poll() with max.poll.records. > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.internals.ConsumerCoordinator.sendOffsetCommitRequest(Co > > >>>>>>> nsumerCoordinator.java:698) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.internals.ConsumerCoordinator.commitOffsetsSync(Consumer > > >>>>>>> Coordinator.java:577) > > >>>>>>> > > >>>>>>> at org.apache.kafka.clients.consu > > >>>>>>> mer.KafkaConsumer.commitSync(KafkaConsumer.java:1125) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamTask.commitOffsets(StreamTask.java:296) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread$3.apply(StreamThread.java:535) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> > > ssor.internals.StreamThread.performOnAllTasks(StreamThread.java:503) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.commitOffsets(StreamThread.java:531) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.suspendTasksAndState(StreamThrea > > >>>>> d.java:480) > > >>>>>>> > > >>>>>>> ... 10 more > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> 2017-05-03 16:53:57 WARN StreamThread:1184 - Could not create > task > > >>>>> 1_38. > > >>>>>>> Will retry. > > >>>>>>> > > >>>>>>> org.apache.kafka.streams.errors.LockException: task [1_38] > Failed > > to > > >>>>> lock > > >>>>>>> the state directory: /data/streampoc/LIC2-5/1_38 > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.ProcessorStateManager.<init>(ProcessorStateMa > > >>>>>>> nager.java:102) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.AbstractTask.<init>(AbstractTask.java:73) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamTask.<init>(StreamTask.java:108) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.createStreamTask(StreamThread. > java:834) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread$TaskCreator.createTask(StreamThr > > >>>>> ead.java:1207) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread$AbstractTaskCreator.retryWithBac > > >>>>>>> koff(StreamThread.java:1180) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.addStreamTasks(StreamThread. > java:937) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread.access$500(StreamThread.java:69) > > >>>>>>> > > >>>>>>> at org.apache.kafka.streams.proce > > >>>>>>> ssor.internals.StreamThread$1.onPartitionsAssigned(StreamThr > > >>>>> ead.java:236) > > >>>>>>> > > >>>>>>> > > >>>>>>> Regards, > > >>>>>>> > > >>>>>>> -Sameer. > > >>>>>>> > > >>>>>> > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > >> > > > > > -- -- Guozhang