unsubscribe On Thu, 14 Oct 2021 at 18:04, Murilo Tavares <murilo...@gmail.com> wrote:
> Hi Guozhang > Thanks for your response. I'm on KafkaStreams 2.8.1. > Since you asked, is KafkaStreams 3.0.0 compatible with a 2.4.1 broker? > > But I found the issue. > TL;DR: I increased max.poll.interval.ms and decreased max.poll.records > which > fixed the problem. > > I noticed the StreamThread summary logs > Processed X total records, ran X punctuators, and committed X total tasks > since the last update > every 3 minutes and so. Looking at the code, I noticed they should be > printed after a polling loop is completed as long as at least 2 min have > passed since the last summary. > > Then I noticed that sometimes these messages would have a larger interval, > in around 10 min or so. So I got the conclusion that polling was taking too > long. > So I increased max.poll.interval.ms and decreased max.poll.records (which > we btw had on a non-default value of 1000, rather than 500). > That improved the problem, as the polling loops would not timeout very > often until it caught up. > The weird thing I noticed is that, even after caught up and no more polling > timeouts, we still had rebalances for an extra 2h, with messages saying: > Finished unstable assignment of tasks > For some reason, tasks seemed to keep migrating for 2h even after the > polling stopped timing out and the message backlog had been processed. > But now the service is stable again. > > Thanks > Murilo > > > > On Wed, 13 Oct 2021 at 14:27, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Murilo, which version of Kafka Streams are you using? Could you try > the > > latest 3.0.0 release if it is not yet on that version? > > > > On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares <murilo...@gmail.com> > > wrote: > > > > > Hi > > > I have a large, stateful, KafkaStreams application that is on a never > > > ending rebalance loop. > > > I can see that Task restorations take a loooong time (circa 30-45 min). > > And > > > after that I see this error. > > > This is followed by tasks being suspended, and the instance re-joining > > the > > > group and a new rebalance is triggered. > > > Any ideas on how to fix this? > > > > > > WARN org.apache.kafka.streams.processor.internals.StreamThread - > stream- > > > thread [inventory-streams-green-0-StreamThread-1] Detected that the > > thread > > > is being fenced. This implies that this thread missed a rebalance and > > > dropped out of the consumer group. Will close out all assigned tasks > and > > > rejoin the consumer group. > > > org.apache.kafka.streams.errors.TaskMigratedException: Consumer > > committing > > > offsets failed, indicating the corresponding thread is no longer part > of > > > the group; it means all tasks belonging to this thread should be > > migrated. > > > at > > > > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction > > > (TaskManager.java:1141) ~[app.jar:?] at > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation( > > > TaskManager.java:541) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked > > > (StreamsRebalanceListener.java:95) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked > > > (ConsumerCoordinator.java:312) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete > > > (ConsumerCoordinator.java:408) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded > > > (AbstractCoordinator.java:449) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup > > > (AbstractCoordinator.java:365) ~[app.jar:?] at > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > > > ConsumerCoordinator.java:508) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded > > > (KafkaConsumer.java:1261) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230 > > > ) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > > KafkaConsumer.java:1210) ~[app.jar:?] at > > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests( > > > StreamThread.java:925) ~[app.jar:?] at > > > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase( > > > StreamThread.java:885) ~[app.jar:?] at > > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > > > StreamThread.java:720) [app.jar:?] at > > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > > StreamThread.java:583) [app.jar:?] at > > > org.apache.kafka.streams.processor.internals.StreamThread.run( > > > StreamThread.java:556) [app.jar:?] Caused by: > > > org.apache.kafka.clients.consumer.CommitFailedException: Offset commit > > > cannot be completed since the consumer is not part of an active group > for > > > auto partition assignment; it is likely that the consumer was kicked > out > > of > > > the group. at > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest > > > (ConsumerCoordinator.java:1139) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync > > > (ConsumerCoordinator.java:1004) ~[app.jar:?] at > > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > > KafkaConsumer.java:1490) ~[app.jar:?] at > > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > > KafkaConsumer.java:1438) ~[app.jar:?] at > > > > > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction > > > (TaskManager.java:1139) ~[app.jar:?] ... 15 more > > > > > > > > > -- > > -- Guozhang > > >