Hi Guozhang
Thanks for your response. I'm on KafkaStreams 2.8.1.
Since you asked, is KafkaStreams 3.0.0 compatible with a 2.4.1 broker?

But I found the issue.
TL;DR: I increased max.poll.interval.ms and decreased max.poll.records which
fixed the problem.

I noticed the StreamThread summary logs
Processed X total records, ran X punctuators, and committed X total tasks
since the last update
every 3 minutes and so. Looking at the code, I noticed they should be
printed after a polling loop is completed as long as at least 2 min have
passed since the last summary.

Then I noticed that sometimes these messages would have a larger interval,
in around 10 min or so. So I got the conclusion that polling was taking too
long.
So I increased max.poll.interval.ms and decreased max.poll.records (which
we btw had on a non-default value of 1000, rather than 500).
That improved the problem, as the polling loops would not timeout very
often until it caught up.
The weird thing I noticed is that, even after caught up and no more polling
timeouts, we still had rebalances for an extra 2h, with messages saying:
Finished unstable assignment of tasks
For some reason, tasks seemed to keep migrating for 2h even after the
polling stopped timing out and the message backlog had been processed.
But now the service is stable again.

Thanks
Murilo



On Wed, 13 Oct 2021 at 14:27, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Murilo, which version of Kafka Streams are you using? Could you try the
> latest 3.0.0 release if it is not yet on that version?
>
> On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares <murilo...@gmail.com>
> wrote:
>
> > Hi
> > I have a large, stateful, KafkaStreams application that is on a never
> > ending rebalance loop.
> > I can see that Task restorations take a loooong time (circa 30-45 min).
> And
> > after that I see this error.
> > This is followed by tasks being suspended, and the instance re-joining
> the
> > group and a new rebalance is triggered.
> > Any ideas on how to fix this?
> >
> > WARN org.apache.kafka.streams.processor.internals.StreamThread - stream-
> > thread [inventory-streams-green-0-StreamThread-1] Detected that the
> thread
> > is being fenced. This implies that this thread missed a rebalance and
> > dropped out of the consumer group. Will close out all assigned tasks and
> > rejoin the consumer group.
> > org.apache.kafka.streams.errors.TaskMigratedException: Consumer
> committing
> > offsets failed, indicating the corresponding thread is no longer part of
> > the group; it means all tasks belonging to this thread should be
> migrated.
> > at
> >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > (TaskManager.java:1141) ~[app.jar:?] at
> >
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(
> > TaskManager.java:541) ~[app.jar:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked
> > (StreamsRebalanceListener.java:95) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked
> > (ConsumerCoordinator.java:312) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete
> > (ConsumerCoordinator.java:408) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded
> > (AbstractCoordinator.java:449) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup
> > (AbstractCoordinator.java:365) ~[app.jar:?] at
> > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > ConsumerCoordinator.java:508) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded
> > (KafkaConsumer.java:1261) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230
> > ) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > KafkaConsumer.java:1210) ~[app.jar:?] at
> > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
> > StreamThread.java:925) ~[app.jar:?] at
> > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(
> > StreamThread.java:885) ~[app.jar:?] at
> > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > StreamThread.java:720) [app.jar:?] at
> > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > StreamThread.java:583) [app.jar:?] at
> > org.apache.kafka.streams.processor.internals.StreamThread.run(
> > StreamThread.java:556) [app.jar:?] Caused by:
> > org.apache.kafka.clients.consumer.CommitFailedException: Offset commit
> > cannot be completed since the consumer is not part of an active group for
> > auto partition assignment; it is likely that the consumer was kicked out
> of
> > the group. at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest
> > (ConsumerCoordinator.java:1139) ~[app.jar:?] at
> >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
> > (ConsumerCoordinator.java:1004) ~[app.jar:?] at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > KafkaConsumer.java:1490) ~[app.jar:?] at
> > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > KafkaConsumer.java:1438) ~[app.jar:?] at
> >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > (TaskManager.java:1139) ~[app.jar:?] ... 15 more
> >
>
>
> --
> -- Guozhang
>

Reply via email to