Ok I tried to reproduce the issue with a minimal example, and saw the same results. It seems like there's something weird going on with that exact topology that's causing it to get stuck during the assignment. Maybe it's causing an unexpected cycle in the topology that the assignor can't handle? Pretty weird that even removing the windowedBy fixes the issue, since a topology with a windowed aggregation is pretty much isomorphic to one with just a regular aggregation.
Can you create a JIRA ticket for this and include your observations + link to the example? It's definitely a bug, and we'll need to look into this to understand what's going wrong here. Sorry for the trouble, but thanks for bring it to our attention On Wed, Oct 28, 2020 at 12:24 PM Alex Jablonski <ajablon...@thoughtworks.com> wrote: > This block: > > @EmbeddedKafka( > topics = { > "WordCounts", "WordsForNumbers", "OutputTopic" > } > ) > > starts up an embedded Kafka in the test and creates the 3 topics (2 > input and 1 output). By default it creates them with 2 partitions > each, but changing to 1 partition didn't alter the endless-rebalancing > behavior. > > We also see the endless rebalancing behavior in a real Kafka cluster, > using input and output topics that have already been created (and are > readily consumed from and written to). > > > > > On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > Yeah there's definitely something weird going on (assuming this is the > full > > logs over that > > time period). The last thing we see logged from the StreamThread is this > > message from > > around the start of the task assignment process: > > > > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1] > > o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread > > > > > [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer] > > Constructed client metadata > > {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null, > > > > > consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006], > > state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) > > prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) > > prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member > > subscriptions. > > > > > > > > which is at 12:22:37. Then there's nothing else from Streams until at > least > > 12:25:00, > > where the logs end. Not sure what it could be doing inside the assignor > for > > 2+ minutes > > without ever reaching another...how many partitions are on the input > > topics? Are you > > sure the input topics have been pre-created before starting the app, with > > the correct > > names, etc? > > > > On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski < > > ajablon...@thoughtworks.com> > > wrote: > > > > > Hi Sophie, > > > > > > Thanks for your questions! Responses inline below. Also, I realized I > > > linked to the gradle file, not the interesting bits of the example. > This > > > < > > > https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java > > > > > > is the configuration and this > > > < > > > https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java > > > > > > is the test. > > > > > > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman < > > sop...@confluent.io> > > > wrote: > > > > > >> > > > >> > We've been able to get the crucial factors that cause this behavior > > >> down to > > >> > a particular combination > > >> > > >> What do you mean by this -- that you only see this when all four of > > those > > >> operators > > >> are at play? Or do you see it with any of them. > > >> > > > > > > We see this when all four operators are in play. If you change the > sample > > > streams configuration to not do that final foreign key join, or not use > > > custom serdes for example, I don't see the stuck-state issue (the > > > application transitions to running state just fine). > > > > > > > > >> > > >> I guess the first thing to narrow down is whether it's actually > > >> rebalancing > > >> or just > > >> restoring within this time (the REBALANCING state is somewhat > > >> misleadingly-named). > > >> If this is a completely new app then it's probably not restoring, but > if > > >> this app had > > >> already been running and building up state before hitting this issue > > then > > >> that's probably > > >> the reason. It's not at all uncommon for restoration to take more than > > 30 > > >> seconds. > > >> > > > > > > This is happening with the app in a completely new state -- in the > test, > > > for example, there's no pre-loaded data when we're asserting that the > app > > > should eventually get to RUNNING, and none of the internal topics > exist. > > > > > > > > >> If it really is rebalancing this entire time, then you need to look > into > > >> the logs to figure > > >> out why. I don't see anything obviously wrong with your particular > > >> application, and even > > >> if there was it should never result in endless rebalances like this. > How > > >> many instances > > >> of the application are you running? > > >> > > > > > > In our actual application, we have 3 instances, but in the tests in > that > > > sample project, there's only 1. > > > > > > The logs that we're getting right before the application gets "stuck" > are > > > below. The one that seems most concerning to my uninformed eyes is > > "Member > > > > > > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > > > in group demo-application has failed". I've attached some DEBUG level > > logs > > > too, though nothing was obvious to me that would better explain the > > hanging > > > behavior. > > > > > > 2020-10-28 12:11:19.823 INFO 27127 --- [-StreamThread-1] > > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > > > > > > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > > > groupId=demo-application] Discovered group coordinator localhost:50343 > > (id: > > > 2147483647 rack: null) > > > 2020-10-28 12:11:19.825 INFO 27127 --- [-StreamThread-1] > > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > > > > > > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > > > groupId=demo-application] (Re-)joining group > > > 2020-10-28 12:11:19.842 WARN 27127 --- [-StreamThread-1] > > > org.apache.kafka.clients.NetworkClient : [Consumer > > > > > > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > > > groupId=demo-application] Error while fetching metadata with > correlation > > id > > > 7 : > > > > > > {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION, > > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION, > > > > > > demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION} > > > 2020-10-28 12:11:19.860 INFO 27127 --- [-StreamThread-1] > > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > > > > > > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > > > groupId=demo-application] Join group failed with > > > org.apache.kafka.common.errors.MemberIdRequiredException: The group > > member > > > needs to have a valid member id before actually entering a consumer > group > > > 2020-10-28 12:11:19.861 INFO 27127 --- [-StreamThread-1] > > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > > > > > > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > > > groupId=demo-application] (Re-)joining group > > > 2020-10-28 12:11:19.873 INFO 27127 --- [quest-handler-3] > > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > > Preparing > > > to rebalance group demo-application in state PreparingRebalance with > old > > > generation 0 (__consumer_offsets-2) (reason: Adding new member > > > > > > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > > > with group instance id None) > > > 2020-10-28 12:11:19.882 INFO 27127 --- [cutor-Rebalance] > > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > > Stabilized > > > group demo-application generation 1 (__consumer_offsets-2) > > > 2020-10-28 12:11:19.947 WARN 27127 --- [-StreamThread-1] > > > org.apache.kafka.clients.NetworkClient : [Consumer > > > > > > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > > > groupId=demo-application] Error while fetching metadata with > correlation > > id > > > 9 : > > > > > > {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION, > > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION, > > > > > > demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION} > > > 2020-10-28 12:11:23.462 INFO 27127 --- [er-event-thread] > > > kafka.controller.KafkaController : [Controller id=0] Processing > > > automatic preferred replica leader election > > > 2020-10-28 12:11:29.887 INFO 27127 --- [cutor-Heartbeat] > > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: Member > > > > > > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > > > in group demo-application has failed, removing it from the group > > > 2020-10-28 12:11:29.887 INFO 27127 --- [cutor-Heartbeat] > > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > > Preparing > > > to rebalance group demo-application in state PreparingRebalance with > old > > > generation 1 (__consumer_offsets-2) (reason: removing member > > > > > > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > > > on heartbeat expiration) > > > 2020-10-28 12:11:29.888 INFO 27127 --- [cutor-Heartbeat] > > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: Group > > > demo-application with generation 2 is now empty (__consumer_offsets-2) > > > > > > > > >> > > >> Cheers, > > >> Sophie > > >> > > >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski < > > >> ajablon...@thoughtworks.com> > > >> wrote: > > >> > > >> > Hey there! > > >> > > > >> > My team and I have run across a bit of a jam in our application > where, > > >> > given a particular setup, our Kafka Streams application never seems > to > > >> > start successfully, instead just getting stuck in the REBALANCING > > state. > > >> > We've been able to get the crucial factors that cause this behavior > > >> down to > > >> > a particular combination of (1) grouping, (2) windowing, (3) > > >> aggregating, > > >> > and (4) foreign-key joining, with some of those steps specifying > > Serdes > > >> > besides the default. > > >> > > > >> > It's probably more useful to see a minimal example, so there's one > > here > > >> > < > > >> > > > https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle > > >> > >. > > >> > The underlying Kafka Streams version is 2.5.1. The first test should > > >> show > > >> > the application eventually transition to running state, but it > doesn't > > >> > within the 30 second timeout I've set. Interestingly, getting rid of > > the > > >> > 'Grouped.with' argument to the 'groupBy' function and the > > >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration' > lets > > >> the > > >> > application transition to "RUNNING", though without the correct > Serdes > > >> > that's not too valuable. > > >> > > > >> > There might be a cleaner way to organize the particular flow in the > > toy > > >> > example, but is there something fundamentally wrong with the > approach > > >> laid > > >> > out in that application that would cause Streams to be stuck in > > >> > REBALANCING? I'd appreciate any advice folks could give! > > >> > > > >> > Thanks! > > >> > Alex Jablonski > > >> > > > >> > > > > > >