unsubscribe

On Thu, 14 Oct 2021 at 18:04, Murilo Tavares <murilo...@gmail.com> wrote:

> Hi Guozhang
> Thanks for your response. I'm on KafkaStreams 2.8.1.
> Since you asked, is KafkaStreams 3.0.0 compatible with a 2.4.1 broker?
>
> But I found the issue.
> TL;DR: I increased max.poll.interval.ms and decreased max.poll.records
> which
> fixed the problem.
>
> I noticed the StreamThread summary logs
> Processed X total records, ran X punctuators, and committed X total tasks
> since the last update
> every 3 minutes and so. Looking at the code, I noticed they should be
> printed after a polling loop is completed as long as at least 2 min have
> passed since the last summary.
>
> Then I noticed that sometimes these messages would have a larger interval,
> in around 10 min or so. So I got the conclusion that polling was taking too
> long.
> So I increased max.poll.interval.ms and decreased max.poll.records (which
> we btw had on a non-default value of 1000, rather than 500).
> That improved the problem, as the polling loops would not timeout very
> often until it caught up.
> The weird thing I noticed is that, even after caught up and no more polling
> timeouts, we still had rebalances for an extra 2h, with messages saying:
> Finished unstable assignment of tasks
> For some reason, tasks seemed to keep migrating for 2h even after the
> polling stopped timing out and the message backlog had been processed.
> But now the service is stable again.
>
> Thanks
> Murilo
>
>
>
> On Wed, 13 Oct 2021 at 14:27, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Murilo, which version of Kafka Streams are you using? Could you try
> the
> > latest 3.0.0 release if it is not yet on that version?
> >
> > On Wed, Oct 13, 2021 at 8:02 AM Murilo Tavares <murilo...@gmail.com>
> > wrote:
> >
> > > Hi
> > > I have a large, stateful, KafkaStreams application that is on a never
> > > ending rebalance loop.
> > > I can see that Task restorations take a loooong time (circa 30-45 min).
> > And
> > > after that I see this error.
> > > This is followed by tasks being suspended, and the instance re-joining
> > the
> > > group and a new rebalance is triggered.
> > > Any ideas on how to fix this?
> > >
> > > WARN org.apache.kafka.streams.processor.internals.StreamThread -
> stream-
> > > thread [inventory-streams-green-0-StreamThread-1] Detected that the
> > thread
> > > is being fenced. This implies that this thread missed a rebalance and
> > > dropped out of the consumer group. Will close out all assigned tasks
> and
> > > rejoin the consumer group.
> > > org.apache.kafka.streams.errors.TaskMigratedException: Consumer
> > committing
> > > offsets failed, indicating the corresponding thread is no longer part
> of
> > > the group; it means all tasks belonging to this thread should be
> > migrated.
> > > at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > > (TaskManager.java:1141) ~[app.jar:?] at
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.handleRevocation(
> > > TaskManager.java:541) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked
> > > (StreamsRebalanceListener.java:95) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked
> > > (ConsumerCoordinator.java:312) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete
> > > (ConsumerCoordinator.java:408) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded
> > > (AbstractCoordinator.java:449) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup
> > > (AbstractCoordinator.java:365) ~[app.jar:?] at
> > > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(
> > > ConsumerCoordinator.java:508) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded
> > > (KafkaConsumer.java:1261) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230
> > > ) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > KafkaConsumer.java:1210) ~[app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(
> > > StreamThread.java:925) ~[app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(
> > > StreamThread.java:885) ~[app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(
> > > StreamThread.java:720) [app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > StreamThread.java:583) [app.jar:?] at
> > > org.apache.kafka.streams.processor.internals.StreamThread.run(
> > > StreamThread.java:556) [app.jar:?] Caused by:
> > > org.apache.kafka.clients.consumer.CommitFailedException: Offset commit
> > > cannot be completed since the consumer is not part of an active group
> for
> > > auto partition assignment; it is likely that the consumer was kicked
> out
> > of
> > > the group. at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest
> > > (ConsumerCoordinator.java:1139) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
> > > (ConsumerCoordinator.java:1004) ~[app.jar:?] at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > > KafkaConsumer.java:1490) ~[app.jar:?] at
> > > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(
> > > KafkaConsumer.java:1438) ~[app.jar:?] at
> > >
> > >
> >
> org.apache.kafka.streams.processor.internals.TaskManager.commitOffsetsOrTransaction
> > > (TaskManager.java:1139) ~[app.jar:?] ... 15 more
> > >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to