This block:

@EmbeddedKafka(
        topics = {
                "WordCounts", "WordsForNumbers", "OutputTopic"
        }
)

starts up an embedded Kafka in the test and creates the 3 topics (2
input and 1 output). By default it creates them with 2 partitions
each, but changing to 1 partition didn't alter the endless-rebalancing
behavior.

We also see the endless rebalancing behavior in a real Kafka cluster,
using input and output topics that have already been created (and are
readily consumed from and written to).




On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Yeah there's definitely something weird going on (assuming this is the full
> logs over that
> time period). The last thing we see logged from the StreamThread is this
> message from
> around the start of the task assignment process:
>
> 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1]
> o.a.k.s.p.i.StreamsPartitionAssignor     : stream-thread
>
> [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer]
> Constructed client metadata
> {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null,
>
> consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006],
> state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([])
> prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([])
> prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member
> subscriptions.
>
>
>
> which is at 12:22:37. Then there's nothing else from Streams until at least
> 12:25:00,
> where the logs end. Not sure what it could be doing inside the assignor for
> 2+ minutes
> without ever reaching another...how many partitions are on the input
> topics? Are you
> sure the input topics have been pre-created before starting the app, with
> the correct
> names, etc?
>
> On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski <
> ajablon...@thoughtworks.com>
> wrote:
>
> > Hi Sophie,
> >
> > Thanks for your questions! Responses inline below. Also, I realized I
> > linked to the gradle file, not the interesting bits of the example. This
> > <
> https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java
> >
> > is the configuration and this
> > <
> https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java
> >
> > is the test.
> >
> > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman <
> sop...@confluent.io>
> > wrote:
> >
> >> >
> >> > We've been able to get the crucial factors that cause this behavior
> >> down to
> >> > a particular combination
> >>
> >> What do you mean by this -- that you only see this when all four of
> those
> >> operators
> >> are at play? Or do you see it with any of them.
> >>
> >
> > We see this when all four operators are in play. If you change the sample
> > streams configuration to not do that final foreign key join, or not use
> > custom serdes for example, I don't see the stuck-state issue (the
> > application transitions to running state just fine).
> >
> >
> >>
> >> I guess the first thing to narrow down is whether it's actually
> >> rebalancing
> >> or just
> >> restoring within this time (the REBALANCING state is somewhat
> >> misleadingly-named).
> >> If this is a completely new app then it's probably not restoring, but if
> >> this app had
> >> already been running and building up state before hitting this issue
> then
> >> that's probably
> >> the reason. It's not at all uncommon for restoration to take more than
> 30
> >> seconds.
> >>
> >
> > This is happening with the app in a completely new state -- in the test,
> > for example, there's no pre-loaded data when we're asserting that the app
> > should eventually get to RUNNING, and none of the internal topics exist.
> >
> >
> >> If it really is rebalancing this entire time, then you need to look into
> >> the logs to figure
> >> out why. I don't see anything obviously wrong with your particular
> >> application, and even
> >> if there was it should never result in endless rebalances like this. How
> >> many instances
> >> of the application are you running?
> >>
> >
> > In our actual application, we have 3 instances, but in the tests in that
> > sample project, there's only 1.
> >
> > The logs that we're getting right before the application gets "stuck" are
> > below. The one that seems most concerning to my uninformed eyes is
> "Member
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > in group demo-application has failed". I've attached some DEBUG level
> logs
> > too, though nothing was obvious to me that would better explain the
> hanging
> > behavior.
> >
> > 2020-10-28 12:11:19.823  INFO 27127 --- [-StreamThread-1]
> > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > groupId=demo-application] Discovered group coordinator localhost:50343
> (id:
> > 2147483647 rack: null)
> > 2020-10-28 12:11:19.825  INFO 27127 --- [-StreamThread-1]
> > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > groupId=demo-application] (Re-)joining group
> > 2020-10-28 12:11:19.842  WARN 27127 --- [-StreamThread-1]
> > org.apache.kafka.clients.NetworkClient   : [Consumer
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > groupId=demo-application] Error while fetching metadata with correlation
> id
> > 7 :
> >
> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
> > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
> >
> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
> > 2020-10-28 12:11:19.860  INFO 27127 --- [-StreamThread-1]
> > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > groupId=demo-application] Join group failed with
> > org.apache.kafka.common.errors.MemberIdRequiredException: The group
> member
> > needs to have a valid member id before actually entering a consumer group
> > 2020-10-28 12:11:19.861  INFO 27127 --- [-StreamThread-1]
> > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > groupId=demo-application] (Re-)joining group
> > 2020-10-28 12:11:19.873  INFO 27127 --- [quest-handler-3]
> > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> Preparing
> > to rebalance group demo-application in state PreparingRebalance with old
> > generation 0 (__consumer_offsets-2) (reason: Adding new member
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > with group instance id None)
> > 2020-10-28 12:11:19.882  INFO 27127 --- [cutor-Rebalance]
> > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> Stabilized
> > group demo-application generation 1 (__consumer_offsets-2)
> > 2020-10-28 12:11:19.947  WARN 27127 --- [-StreamThread-1]
> > org.apache.kafka.clients.NetworkClient   : [Consumer
> >
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> > groupId=demo-application] Error while fetching metadata with correlation
> id
> > 9 :
> >
> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
> > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
> >
> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
> > 2020-10-28 12:11:23.462  INFO 27127 --- [er-event-thread]
> > kafka.controller.KafkaController         : [Controller id=0] Processing
> > automatic preferred replica leader election
> > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
> > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Member
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > in group demo-application has failed, removing it from the group
> > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
> > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> Preparing
> > to rebalance group demo-application in state PreparingRebalance with old
> > generation 1 (__consumer_offsets-2) (reason: removing member
> >
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> > on heartbeat expiration)
> > 2020-10-28 12:11:29.888  INFO 27127 --- [cutor-Heartbeat]
> > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Group
> > demo-application with generation 2 is now empty (__consumer_offsets-2)
> >
> >
> >>
> >> Cheers,
> >> Sophie
> >>
> >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski <
> >> ajablon...@thoughtworks.com>
> >> wrote:
> >>
> >> > Hey there!
> >> >
> >> > My team and I have run across a bit of a jam in our application where,
> >> > given a particular setup, our Kafka Streams application never seems to
> >> > start successfully, instead just getting stuck in the REBALANCING
> state.
> >> > We've been able to get the crucial factors that cause this behavior
> >> down to
> >> > a particular combination of (1) grouping, (2) windowing, (3)
> >> aggregating,
> >> > and (4) foreign-key joining, with some of those steps specifying
> Serdes
> >> > besides the default.
> >> >
> >> > It's probably more useful to see a minimal example, so there's one
> here
> >> > <
> >>
> https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle
> >> > >.
> >> > The underlying Kafka Streams version is 2.5.1. The first test should
> >> show
> >> > the application eventually transition to running state, but it doesn't
> >> > within the 30 second timeout I've set. Interestingly, getting rid of
> the
> >> > 'Grouped.with' argument to the 'groupBy' function and the
> >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration' lets
> >> the
> >> > application transition to "RUNNING", though without the correct Serdes
> >> > that's not too valuable.
> >> >
> >> > There might be a cleaner way to organize the particular flow in the
> toy
> >> > example, but is there something fundamentally wrong with the approach
> >> laid
> >> > out in that application that would cause Streams to be stuck in
> >> > REBALANCING? I'd appreciate any advice folks could give!
> >> >
> >> > Thanks!
> >> > Alex Jablonski
> >> >
> >>
> >
>

Reply via email to