Ok I tried to reproduce the issue with a minimal example, and saw the same
results.
It seems like there's something weird going on with that exact topology
that's causing
it to get stuck during the assignment. Maybe it's causing an unexpected
cycle in the
topology that the assignor can't handle? Pretty weird that even removing
the windowedBy
fixes the issue, since a topology with a windowed aggregation is pretty
much isomorphic
to one with just a regular aggregation.

Can you create a JIRA ticket for this and include your observations + link
to the example?
It's definitely a bug, and we'll need to look into this to understand
what's going wrong here.

Sorry for the trouble, but thanks for bring it to our attention

On Wed, Oct 28, 2020 at 12:24 PM Alex Jablonski <ajablon...@thoughtworks.com>
wrote:

> This block:
>
> @EmbeddedKafka(
>         topics = {
>                 "WordCounts", "WordsForNumbers", "OutputTopic"
>         }
> )
>
> starts up an embedded Kafka in the test and creates the 3 topics (2
> input and 1 output). By default it creates them with 2 partitions
> each, but changing to 1 partition didn't alter the endless-rebalancing
> behavior.
>
> We also see the endless rebalancing behavior in a real Kafka cluster,
> using input and output topics that have already been created (and are
> readily consumed from and written to).
>
>
>
>
> On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > Yeah there's definitely something weird going on (assuming this is the
> full
> > logs over that
> > time period). The last thing we see logged from the StreamThread is this
> > message from
> > around the start of the task assignment process:
> >
> > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1]
> > o.a.k.s.p.i.StreamsPartitionAssignor     : stream-thread
> >
> >
> [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer]
> > Constructed client metadata
> > {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null,
> >
> >
> consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006],
> > state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([])
> > prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([])
> > prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member
> > subscriptions.
> >
> >
> >
> > which is at 12:22:37. Then there's nothing else from Streams until at
> least
> > 12:25:00,
> > where the logs end. Not sure what it could be doing inside the assignor
> for
> > 2+ minutes
> > without ever reaching another...how many partitions are on the input
> > topics? Are you
> > sure the input topics have been pre-created before starting the app, with
> > the correct
> > names, etc?
> >
> > On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski <
> > ajablon...@thoughtworks.com>
> > wrote:
> >
> > > Hi Sophie,
> > >
> > > Thanks for your questions! Responses inline below. Also, I realized I
> > > linked to the gradle file, not the interesting bits of the example.
> This
> > > <
> >
> https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java
> > >
> > > is the configuration and this
> > > <
> >
> https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java
> > >
> > > is the test.
> > >
> > > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > wrote:
> > >
> > >> >
> > >> > We've been able to get the crucial factors that cause this behavior
> > >> down to
> > >> > a particular combination
> > >>
> > >> What do you mean by this -- that you only see this when all four of
> > those
> > >> operators
> > >> are at play? Or do you see it with any of them.
> > >>
> > >
> > > We see this when all four operators are in play. If you change the
> sample
> > > streams configuration to not do that final foreign key join, or not use
> > > custom serdes for example, I don't see the stuck-state issue (the
> > > application transitions to running state just fine).
> > >
> > >
> > >>
> > >> I guess the first thing to narrow down is whether it's actually
> > >> rebalancing
> > >> or just
> > >> restoring within this time (the REBALANCING state is somewhat
> > >> misleadingly-named).
> > >> If this is a completely new app then it's probably not restoring, but
> if
> > >> this app had
> > >> already been running and building up state before hitting this issue
> > then
> > >> that's probably
> > >> the reason. It's not at all uncommon for restoration to take more than
> > 30
> > >> seconds.
> > >>
> > >
> > > This is happening with the app in a completely new state -- in the
> test,
> > > for example, there's no pre-loaded data when we're asserting that the
> app
> > > should eventually get to RUNNING, and none of the internal topics
> exist.
> > >
> > >
> > >> If it really is rebalancing this entire time, then you need to look
> into
> > >> the logs to figure
> > >> out why. I don't see anything obviously wrong with your particular
> > >> application, and even
> > >> if there was it should never result in endless rebalances like this.
> How
> > >> many instances
> > >> of the application are you running?
> > >>
> > >
> > > In our actual application, we have 3 instances, but in the tests in
> that
> > > sample project, there's only 1.
> > >
> > > The logs that we're getting right before the application gets "stuck"
> are
> > > below. The one that seems most concerning to my uninformed eyes is
> > "Member
> > >
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > > in group demo-application has failed". I've attached some DEBUG level
> > logs
> > > too, though nothing was obvious to me that would better explain the
> > hanging
> > > behavior.
> > >
> > > 2020-10-28 12:11:19.823  INFO 27127 --- [-StreamThread-1]
> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> > >
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > > groupId=demo-application] Discovered group coordinator localhost:50343
> > (id:
> > > 2147483647 rack: null)
> > > 2020-10-28 12:11:19.825  INFO 27127 --- [-StreamThread-1]
> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> > >
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > > groupId=demo-application] (Re-)joining group
> > > 2020-10-28 12:11:19.842  WARN 27127 --- [-StreamThread-1]
> > > org.apache.kafka.clients.NetworkClient   : [Consumer
> > >
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > > groupId=demo-application] Error while fetching metadata with
> correlation
> > id
> > > 7 :
> > >
> >
> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
> > >
> >
> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
> > > 2020-10-28 12:11:19.860  INFO 27127 --- [-StreamThread-1]
> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> > >
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > > groupId=demo-application] Join group failed with
> > > org.apache.kafka.common.errors.MemberIdRequiredException: The group
> > member
> > > needs to have a valid member id before actually entering a consumer
> group
> > > 2020-10-28 12:11:19.861  INFO 27127 --- [-StreamThread-1]
> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> > >
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > > groupId=demo-application] (Re-)joining group
> > > 2020-10-28 12:11:19.873  INFO 27127 --- [quest-handler-3]
> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> > Preparing
> > > to rebalance group demo-application in state PreparingRebalance with
> old
> > > generation 0 (__consumer_offsets-2) (reason: Adding new member
> > >
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > > with group instance id None)
> > > 2020-10-28 12:11:19.882  INFO 27127 --- [cutor-Rebalance]
> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> > Stabilized
> > > group demo-application generation 1 (__consumer_offsets-2)
> > > 2020-10-28 12:11:19.947  WARN 27127 --- [-StreamThread-1]
> > > org.apache.kafka.clients.NetworkClient   : [Consumer
> > >
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > > groupId=demo-application] Error while fetching metadata with
> correlation
> > id
> > > 9 :
> > >
> >
> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
> > >
> >
> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
> > > 2020-10-28 12:11:23.462  INFO 27127 --- [er-event-thread]
> > > kafka.controller.KafkaController         : [Controller id=0] Processing
> > > automatic preferred replica leader election
> > > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Member
> > >
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > > in group demo-application has failed, removing it from the group
> > > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> > Preparing
> > > to rebalance group demo-application in state PreparingRebalance with
> old
> > > generation 1 (__consumer_offsets-2) (reason: removing member
> > >
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > > on heartbeat expiration)
> > > 2020-10-28 12:11:29.888  INFO 27127 --- [cutor-Heartbeat]
> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Group
> > > demo-application with generation 2 is now empty (__consumer_offsets-2)
> > >
> > >
> > >>
> > >> Cheers,
> > >> Sophie
> > >>
> > >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski <
> > >> ajablon...@thoughtworks.com>
> > >> wrote:
> > >>
> > >> > Hey there!
> > >> >
> > >> > My team and I have run across a bit of a jam in our application
> where,
> > >> > given a particular setup, our Kafka Streams application never seems
> to
> > >> > start successfully, instead just getting stuck in the REBALANCING
> > state.
> > >> > We've been able to get the crucial factors that cause this behavior
> > >> down to
> > >> > a particular combination of (1) grouping, (2) windowing, (3)
> > >> aggregating,
> > >> > and (4) foreign-key joining, with some of those steps specifying
> > Serdes
> > >> > besides the default.
> > >> >
> > >> > It's probably more useful to see a minimal example, so there's one
> > here
> > >> > <
> > >>
> >
> https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle
> > >> > >.
> > >> > The underlying Kafka Streams version is 2.5.1. The first test should
> > >> show
> > >> > the application eventually transition to running state, but it
> doesn't
> > >> > within the 30 second timeout I've set. Interestingly, getting rid of
> > the
> > >> > 'Grouped.with' argument to the 'groupBy' function and the
> > >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration'
> lets
> > >> the
> > >> > application transition to "RUNNING", though without the correct
> Serdes
> > >> > that's not too valuable.
> > >> >
> > >> > There might be a cleaner way to organize the particular flow in the
> > toy
> > >> > example, but is there something fundamentally wrong with the
> approach
> > >> laid
> > >> > out in that application that would cause Streams to be stuck in
> > >> > REBALANCING? I'd appreciate any advice folks could give!
> > >> >
> > >> > Thanks!
> > >> > Alex Jablonski
> > >> >
> > >>
> > >
> >
>

Reply via email to