Hi Guozhang Thanks for your response. I'm on KafkaStreams 2.8.1. Since you asked, is KafkaStreams 3.0.0 compatible with a 2.4.1 broker?
But I found the issue. TL;DR: I increased max.poll.interval.ms and decreased max.poll.records which fixed the problem. I noticed the StreamThread summary logs Processed X total records, ran X punctuators, and committed X total tasks since the last update every 3 minutes and so. Looking at the code, I noticed they should be printed after a polling loop is completed as long as at least 2 min have passed since the last summary. Then I noticed that sometimes these messages would have a larger interval, in around 10 min or so. So I got the conclusion that polling was taking too long. So I increased max.poll.interval.ms and decreased max.poll.records (which we btw had on a non-default value of 1000, rather than 500). That improved the problem, as the polling loops would not timeout very often until it caught up. The weird thing I noticed is that, even after caught up and no more polling timeouts, we still had rebalances for an extra 2h, with messages saying: Finished unstable assignment of tasks For some reason, tasks seemed to keep migrating for 2h even after the polling stopped timing out and the message backlog had been processed. But now the service is stable again. Thanks Murilo On Wed, 13 Oct 2021 at 14:27, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Murilo, which version of Kafka Streams are you using? Could you try the > latest 3.0.0 release if it is not yet on that version? > > On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares <murilo...@gmail.com> > wrote: > > > Hi > > I have a large, stateful, KafkaStreams application that is on a never > > ending rebalance loop. > > I can see that Task restorations take a loooong time (circa 30-45 min). > And > > after that I see this error. > > This is followed by tasks being suspended, and the instance re-joining > the > > group and a new rebalance is triggered. > > Any ideas on how to fix this? > > > > WARN org.apache.kafka.streams.processor.internals.StreamThread - stream- > > thread [inventory-streams-green-0-StreamThread-1] Detected that the > thread > > is being fenced. This implies that this thread missed a rebalance and > > dropped out of the consumer group. Will close out all assigned tasks and > > rejoin the consumer group. > > org.apache.kafka.streams.errors.TaskMigratedException: Consumer > committing > > offsets failed, indicating the corresponding thread is no longer part of > > the group; it means all tasks belonging to this thread should be > migrated. > > at > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction > > (TaskManager.java:1141) ~[app.jar:?] at > > > org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation( > > TaskManager.java:541) ~[app.jar:?] at > > > > > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked > > (StreamsRebalanceListener.java:95) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked > > (ConsumerCoordinator.java:312) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete > > (ConsumerCoordinator.java:408) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded > > (AbstractCoordinator.java:449) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup > > (AbstractCoordinator.java:365) ~[app.jar:?] at > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll( > > ConsumerCoordinator.java:508) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded > > (KafkaConsumer.java:1261) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230 > > ) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > > KafkaConsumer.java:1210) ~[app.jar:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests( > > StreamThread.java:925) ~[app.jar:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase( > > StreamThread.java:885) ~[app.jar:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce( > > StreamThread.java:720) [app.jar:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop( > > StreamThread.java:583) [app.jar:?] at > > org.apache.kafka.streams.processor.internals.StreamThread.run( > > StreamThread.java:556) [app.jar:?] Caused by: > > org.apache.kafka.clients.consumer.CommitFailedException: Offset commit > > cannot be completed since the consumer is not part of an active group for > > auto partition assignment; it is likely that the consumer was kicked out > of > > the group. at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest > > (ConsumerCoordinator.java:1139) ~[app.jar:?] at > > > > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync > > (ConsumerCoordinator.java:1004) ~[app.jar:?] at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > KafkaConsumer.java:1490) ~[app.jar:?] at > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync( > > KafkaConsumer.java:1438) ~[app.jar:?] at > > > > > org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction > > (TaskManager.java:1139) ~[app.jar:?] ... 15 more > > > > > -- > -- Guozhang >