Hi Bruno/Lucas, A few comments related to consistency with the other KIPs. As we now have multiple types of group, we need to work out how to handle listing, describing and so on in a consistent way.
AS14: KIP-1043 introduced Admin#listGroups and this is intended as the way to list groups of all types. As a result, the specific methods and classes which applied to just one type of group have been removed. Admin#listShareGroups does not exist (any more) and instead Admin#listGroups with a filter on group type can be used instead. There is no ShareGroupListing, ListShareGroupsOptions or ListShareGroupsResult either. All of the equivalent stuff for consumer groups are being deprecated and removed by KIP-1043 (probably in AK 4.1). I suggest that you remove Admin#listStreamsGroups also. It is not necessary because Admin#listGroups can do it. AS15: Similarly, there is no ShareGroupState any longer, just GroupState. A streams group has a specific state, NOT_READY. I recommend that you add this state to GroupState, and use GroupState instead of having StreamsGroupState. During the development of KIP-1043, it became clear that having enumerations for all of the group types separately was cumbersome and it prevented a state-based filter on Admin#listGroups. AS16: The o.a,k.common.GroupType for streams should be STREAMS not Streams. AS17: Please add streams groups to bin/kafka-groups.sh. Thanks, Andrew ________________________________________ From: Sophie Blee-Goldman <sop...@responsive.dev> Sent: 20 November 2024 08:15 To: dev@kafka.apache.org <dev@kafka.apache.org> Subject: Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol Thanks Lucas! That all sounds good to me. I think I officially have nothing left to say or ask about this KIP, so once you've updated the doc with what we've discussed in the past few messages then I'm personally feeling ready to vote on it. On Tue, Nov 19, 2024 at 11:10 PM Lucas Brutschy <lbruts...@confluent.io.invalid> wrote: > Hi Sophie, > > S1. Yes, we'll include a list of missing topics or topic > misconfigurations in the heartbeat response (StatusDetail). Of course, > we will not only expose this via the streams group state metric which > can be monitored, we will log the state of the group and the status > detail on the client, so it should be pretty straight forward for new > users to figure out the problem. > > As for the streams group state listener - the streams group state is > sent with the heartbeat response, so it's not directly possible to > implement it. Maybe it would make more sense to define the listener on > the `status` code sent in the heartbeat response, which includes > members-specific states (like STALE_TOPOLOGY). But yes, let's do this > in a separate KIP. I'll create a ticket for it (once we have accepted > this KIP and start creating tickets for KIP-1071). > > S2. Yes, the config `topology.epoch` would go to StreamsConfig. > StreamsGroupDescribe RPC would return the current epoch. As Bruno > suggested, the default of the `topology.epoch` would be 0 on the > client and the epoch would start at -1 on the broker (or 1 and 0, if > people prefer). So to first initialize a group, no config change would > be required. People who reset their application or change the > application ID upon redeploys would never have to touch the config. > > I think we don't need the topology ID to validate that the user did > not change the topology without bumping the epoch. We keep the > topology and topology epoch immutable during a client's lifetime, so > it's enough that we validate this once when joining. When the client > joins, we can check whether the topology_epoch & topology match the > broker-side. All following heartbeats don't include the topology > anymore, but we do not need to "reverify" that the topology epoch & > topology are consistent, it is enough to compare epochs. > > Cheers, > Lucas > > On Wed, Nov 20, 2024 at 4:56 AM Sophie Blee-Goldman > <sop...@responsive.dev> wrote: > > > > Thanks Lucas! A few more questions to nail down the details but on the > > whole I think this is a good plan > > > > S1. I definitely agree that the current behavior is inconsistent and not > > ideal for missing topics. So I'm fine with changing this, just wanted to > > make sure it was intentional. That said I do think it would be a good > > exercise to think through the user experience here and what it will look > > like in practice for someone to debug why their app is not doing anything > > due to missing source topics. Especially because in my experience this > > tends to happen most with relatively new users who are just trying out > > Kafka Streams eg via the quickstart, or running and debugging in their > test > > environment, etc. Asking them to monitor the group state might be a lot > for > > some folks, especially if that has to be done via a separate cli they > > probably don't even know about. But I do think you're right that on the > > whole, the application should stay up and running and not immediately die > > if topics are missing. It sounds like we do plan to log a warning when > this > > happens as detected by the heartbeat, which helps. Would be even better > if > > we could return a list of the missing topic names in the heartbeat and > > include that in the logs. > > > > Beyond that, what if we introduced a new kind of listener similar to the > > KafkaStreams.State listener, but instead of listening in on the state of > > the client it listens on the state of the Streams group? This could be > done > > in a followup KIP of course, just pointing out that we can improve on > this > > in the future without modifying anything in the current KIP. I'd prefer > > this over introducing a timeout for missing topics,, personally. (If you > > don't want to include it in the current KIP let's just file a ticket for > it > > perhaps?) > > > > S2. Ok just to hammer out the details, we're proposing that the topology > > epoch be configurable via a new StreamsConfig, is that right? And that > > there will be an Admin API that can be used to fetch the current topology > > epoch for the group? > > > > How will this work if a user doesn't set the topology.epoch config? Do we > > just initialize it to 0 and assume that the topology will always be the > > same until/unless the user manually bumps it by the StreamsConfig? Would > we > > maybe want to keep the topology ID around so that we can verify the > > topology ID remains the same and matches for a given topology epoch? Or > > should we just leave it up to users to not screw with their topology > > without bumping the topology epoch? To be fair, that's more or less what > we > > do now, but it feels like if we introduce a concept of topology version > > then we should try to enforce it somehow. I'm not sure, what are your > > thoughts here? > > > > Or does each client send the entire topology in each heartbeat? That kind > > of seems like a lot of data to be passing around that often. We could > > replace the member.topology with member.topology_ID for purposes of > > verification (eg detecting the STREAMS_INVALID_TOPOLOGY_EPOCH case) and > get > > verification on every heartbeat without all the extra bytes. Probably not > > an issue most of the time but people can run some truly huge topologies. > > > > On Tue, Nov 19, 2024 at 2:47 AM Lucas Brutschy > > <lbruts...@confluent.io.invalid> wrote: > > > > > Hi Sophie, > > > > > > S1. You are reading it correctly. We added NOT_READY to bridge the > > > time when internal topics are being created. Right now, the > > > StreamsPartitioner just blocks until the internal topics are created, > > > but we don't want to block in the broker. We want the client to be > > > able to join while the internal topics may not be created, yet. > > > > > > We then went on to simplify the story about topic / topology > > > mismatches to treat all such errors the same way - we enter NOT_READY, > > > but keep the application running, and indicate the problem in the > > > heartbeats. The idea is that the user just monitors the status of the > > > group, which is exposed as a metric. > > > > > > We can try to make the behavior more like the StreamsPartitioner, but > > > its behavior is quite inconsistent. For example, missing source topics > > > are handled via an error code which shut down the application, but > > > problems when creating the internal topics, or topics not being > > > copartitioned, we just let TopologyException/TimeoutException fall > > > through (which I believe fails the group leader only). > > > > > > We could consider shutting down the application upon missing / > > > mispartitioned internal or source topics. I wonder if the simplest way > > > to do that would be to introduce a timeout on the client side. I think > > > the internal topic manager right now uses something like > > > max.poll.interval*2. But I'm not sure if shutting down the application > > > really simplifies things. > > > > > > S2. Let me try to add some details to the Bruno's proposal. Manually, > > > to upgrade a topology, the user would just deploy a new version of the > > > application, which has the new topology and a configuration change (in > > > the code or wherever) and deploy it. If the user doesn't know the > > > current topology epoch, they could fetch it using the streams groups > > > command-line tool, or potentially a UI. Automating in CD would work > > > the same way - determine the current topology epoch using admin API or > > > the command-line tool, set the config (by changing a file, or > > > launchdarkly or whatever your setup is) and deploy. It's definitely > > > not completely trivial, but it is an advanced feature, after all. I > > > made a rough sketch of error handling for topology epochs, I'll put > > > this in the "appendix" below since it's quite detailed. > > > > > > S3. If we are going for topology epochs, which is a nice proposal, I > > > am not sure if we actually need to have the topology ID anymore - > > > since we can store the topology epoch that each member is currently > > > at, and we cannot change the topology without bumping the epoch, the > > > point of the ID (detecting that a member is working with a stale > > > topology) can be replaced by comparing topology epochs. So if we go > > > down this route, I’d suggest removing topology IDs. We could introduce > > > them for naming purposes, but they are not required by the protocol, > > > so I'd rather do that later, if ever. > > > > > > Cheers, Lucas > > > > > > ## Appendix: error handling for topology epochs. > > > > > > We’ll require some new/changed error codes / statuses to handle the > > > updates correctly. > > > > > > * Non-fatal status INCONSISTENT_TOPOLOGY is renamed to > > > STALE_TOPOLOGY, since we now have something like a topology > > > chronology. It is returned for members who are already part of the > > > group, but at an old topology epoch > > > * Fatal error STREAMS_INVALID_TOPOLOGY_EPOCH if, for example, the > > > topology was changed by the epoch not bumped. > > > * Fatal error STREAMS_TOPOLOGY_FENCED if we are trying to join with > > > an outdated topology. > > > > > > So then all possible cases should be handled like this: > > > > > > * When a member joins (heartbeat.member_epoch = 0) > > > ** Require both heartbeat.topology and heartbeat.topology_epoch to be > set. > > > ** If heartbeat.topology_epoch = group.topology_epoch + 1, the member > > > joins and updates topology ID/metadata/epoch for the group. > > > ** If heartbeat.topology_epoch = group.topology_epoch and > > > heartbeat.topology = group.topology, the member joins without updating > > > the topology ID/metadata/epoch for the group. > > > ** If heartbeat.topology_epoch = group.topology_epoch but > > > heartbeat.topology != group.topology, we will fail with error > > > STREAMS_INVALID_TOPOLOGY_EPOCH. > > > ** If heartbeat.topology_epoch > group.topology_epoch + 1, we fail > > > with new error STREAMS_INVALID_TOPOLOGY_EPOCH > > > ** If heartbeat.topology_epoch < group.topology_epoch, we fail with > > > STREAMS_TOPOLOGY_FENCED > > > * Otherwise (if heartbeat.member_epoch != 0) > > > ** Require that none of heartbeat.topology, heartbeat.topology_epoch > > > are set in the request, and let member.topology_epoch be the topology > > > epoch stored in the member metadata on the broker for that member. > > > * If member.topology_epoch < group.topology_epoch, the heartbeat is > > > successful, but we set the state STALE_TOPOLOGY. > > > * If member.topology_epoch > group.topology_epoch, should be > impossible. > > > * If member.topology_epoch = group.topology_epoch, the heartbeat is > > > successful. > > > > > > Common cases that could happen in practice (each of them should have a > > > different status / error code to be distinguishable): > > > > > > * When a user rolls the application with a new topology with a > > > different topology, but without bumping the topology epoch, the roll > > > of the first client would directly fail with > > > STREAMS_INVALID_TOPOLOGY_EPOCH, which would be treated as fatal and > > > cause the first client to not come up - which cause the user to > > > correctly detect the error. > > > * When a user rolls the application with a new topology epoch / > topology > > > ** The first client being rolled will bump the topology epoch > > > ** All other clients will see that they are using an outdated > > > topology by receiving a non-fatal STALE_TOPOLOGY error > > > ** If any of the stale clients have to rejoin the group before being > > > updated to the new topology, they will fail fatally with > > > STREAMS_TOPOLOGY_FENCED. > > > > > > > > > On Mon, Nov 18, 2024 at 12:53 PM Nick Telford <nick.telf...@gmail.com> > > > wrote: > > > > > > > > Actually, scratch that. On reflection I think I prefer Bruno's > original > > > > idea to specify it in the configuration. > > > > > > > > Cheers, > > > > Nick > > > > > > > > On Sat, 16 Nov 2024 at 17:59, Nick Telford <nick.telf...@gmail.com> > > > wrote: > > > > > > > > > Hey everyone, > > > > > > > > > > With respect to Bruno's proposal, could instances cache their > topology > > > > > epoch on disk, and then upgrades/downgrades would simply involve > > > deleting > > > > > the cached epoch before starting the instance? > > > > > > > > > > My thinking is that this might be potentially simpler for users > than > > > > > modifying configuration. > > > > > > > > > > Cheers, > > > > > Nick > > > > > > > > > > On Sat, 16 Nov 2024, 00:41 Sophie Blee-Goldman, < > sop...@responsive.dev > > > > > > > > > wrote: > > > > > > > > > >> Thanks for the updates! Few minor questions before we wrap this > up and > > > > >> move > > > > >> to a vote: > > > > >> > > > > >> S1. Can you clarify the outward-facing behavior of a group that > > > enters the > > > > >> NOT_READY state due to, say, missing source topics? It sounds > like the > > > > >> application will continue to run without processing anything, > which > > > would > > > > >> be a change compared to today where we shut down with an explicit > > > error. > > > > >> Am > > > > >> I reading this correctly and is this change intentional? > > > > >> > > > > >> S2. Regarding topology upgrades, I am fully in favor of Bruno's > > > proposal. > > > > >> Just feel like we need to flesh it out a bit. For example you > say "no > > > > >> specific tooling" is needed, but how would a user go about > upgrading > > > the > > > > >> topology? If we can just describe this process in a bit more > detail > > > then I > > > > >> would feel good. > > > > >> > > > > >> S3. Lastly, just a quick reminder, we should make sure to clarify > in > > > the > > > > >> KIP how the topology ID will be computed since that impacts which > > > kind of > > > > >> upgrades are possible and is therefore part of the public API. > IIUC > > > the > > > > >> plan is to compute it from the topics which makes sense to me > (though > > > I do > > > > >> think it should eventually be pluggable, perhaps in a followup > KIP) > > > > >> > > > > >> On Thu, Nov 7, 2024 at 10:08 AM Bruno Cadonna <cado...@apache.org > > > > > wrote: > > > > >> > > > > >> > Hi Lucas and all others, > > > > >> > > > > > >> > Thanks for the proposals regarding the topology upgrades! > > > > >> > > > > > >> > I have an additional proposal: > > > > >> > > > > > >> > We introduce a topology epoch alongside the topology ID. The > > > topology > > > > >> > epoch is set to 0 on the group coordinator for a new group. The > > > topology > > > > >> > epoch is also a config within the Streams client, the default > is 1. > > > > >> > > > > > >> > When the Streams client starts up, it sends its topology epoch > > > alongside > > > > >> > the topology ID and the topology to the group coordinator. The > group > > > > >> > coordinator initializes the topology with the given topology ID > and > > > > >> > increases the topology epoch to 1. > > > > >> > > > > > >> > When the client wants to upgrade a topology it needs to send a > valid > > > > >> > topology epoch alongside the new topology ID and the topology. > We > > > can > > > > >> > decide if the new topology epoch = current topology at group > > > coordinator > > > > >> > + 1 or just new topology epoch > current topology on group > > > coordinator. > > > > >> > > > > > >> > When the group coordinator receives a topology ID that differs > from > > > the > > > > >> > one intialized for the group, the group coordinator only > upgrades > > > to the > > > > >> > new topology if the topology epoch is valid. Then, it sets the > > > topology > > > > >> > epoch to the new value. If the topology epoch is not valid, that > > > means > > > > >> > the topology ID is an old one and the group coordinator does not > > > > >> > re-initialize the topology. > > > > >> > > > > > >> > The current topology epoch can be retrieved with the describe > RPC. > > > > >> > > > > > >> > Accidental rollbacks of topologies are not be possible. Only > wanted > > > > >> > rollbacks of topologies with an appropriate topology epoch are > > > allowed. > > > > >> > > > > > >> > Specific tooling is not needed. > > > > >> > > > > > >> > The topology upgrade can be automated in CI in the following > way: > > > > >> > > > > > >> > 1. Call streams group describe to determine the current epoch. > > > > >> > > > > > >> > 2. Set the Streams client config topology.epoch = current epoch > + 1 > > > > >> > > > > > >> > 3. Roll the application. > > > > >> > > > > > >> > WDYT? > > > > >> > > > > > >> > Best, > > > > >> > Bruno > > > > >> > > > > > >> > > > > > >> > On 07.11.24 18:26, Lucas Brutschy wrote: > > > > >> > > Hi all, > > > > >> > > > > > > >> > > I have updated the KIP with some details around how we handle > the > > > > >> > > cases when essential topics required by the topology are not > > > present. > > > > >> > > This is described in a new section "Handling topic topology > > > > >> > > mismatches". The short summary is that we enter a state where > the > > > > >> > > group member's heartbeats will succeed, but no assignments > will be > > > > >> > > made, and the nature of the topic misconfiguration is > > > communicated to > > > > >> > > the members using the `status` field of the heartbeat > response. > > > > >> > > > > > > >> > > Working towards putting up this KIP up for a vote, there was > one > > > last > > > > >> > > concern mentioned during our last sync, which was a concern > > > raised by > > > > >> > > Sophie and others around topology updating. I would like to > > > propose > > > > >> > > the following changes to the KIP to solidify the story around > > > topology > > > > >> > > updating. > > > > >> > > > > > > >> > > To recap, a member sends its topology metadata every time it > > > attempts > > > > >> > > to join a streams group. When the group already has a topology > > > set, > > > > >> > > the KIP proposes to update the topology for the group > immediately > > > with > > > > >> > > that first heartbeat of a member. All other members will then > > > receive > > > > >> > > a non-fatal group status INCONSISTENT_TOPOLOGY, letting them > know > > > that > > > > >> > > their topology is not equal to the groups topology anymore. > Those > > > > >> > > members with an outdated topology will be able to retain their > > > current > > > > >> > > tasks, but will not receive new tasks, until they have also > > > updated > > > > >> > > their topology. This enables a rolling bounce to upgrade the > > > topology. > > > > >> > > > > > > >> > > There is a concern with this behavior, raised both in our > > > discussions > > > > >> > > but also by Guozhang and Sophie. The problem is that a > > > “misbehaving” > > > > >> > > member can always roll back a topology upgrade, if it is still > > > running > > > > >> > > with the old version of the topology. For example, if we roll > 20 > > > > >> > > members to a new topology version, and we have successfully > > > restarted > > > > >> > > 19 out of 20, but the last member is restarted with the old > > > topology, > > > > >> > > this will cause the topology version on the broker to be > rolled > > > back. > > > > >> > > This could happen, for example, because topology update fails > on > > > the > > > > >> > > last member, or the last member runs into a fatal error of > some > > > kind > > > > >> > > (out-of-memory or being fenced, for example). Once the failed > > > member > > > > >> > > rejoins, it updates the topology on the broker to the “old” > > > topology, > > > > >> > > so performs an involuntary roll-back. Now, we would be in a > > > situation > > > > >> > > where 19 out of 20 members are blocked from getting new tasks > > > > >> > > assigned. We would be in this state just temporary, until the > > > final, > > > > >> > > misbehaving member is restarted correctly with the new > topology. > > > > >> > > > > > > >> > > The original suggestion in the KIP discussion to solve these > > > issues > > > > >> > > (for now) was to exclude topology updates from the KIP. > However, > > > it > > > > >> > > was confirmed by several people (such as Anna) that topology > > > updates > > > > >> > > are quite common in the wild, so for having a production-ready > > > > >> > > implementation of the new protocol, a mechanism for topology > > > updates > > > > >> > > is required. It is not clear how strict the requirement for > > > rolling > > > > >> > > updates in the first version of the protocol. No ‘better’ > > > alternative > > > > >> > > for updating topologies was suggested. > > > > >> > > > > > > >> > > Proposal 1) Making topology updating optional: We modify the > > > heartbeat > > > > >> > > request to allow joining a group without sending a topology > (only > > > a > > > > >> > > topology ID). When joining a group this way, the member is > > > explicitly > > > > >> > > requesting to not update the topology of the group. In other > > > words, > > > > >> > > leaving the topology as null, will mean that no topology > update > > > will > > > > >> > > be performed on the broker side. If the topology ID does not > > > match the > > > > >> > > topology ID of the group, and no topology update is performed, > > > and the > > > > >> > > member that joined will just behave like any other member > with an > > > > >> > > inconsistent topology ID. That is, it will not get any tasks > > > assigned. > > > > >> > > Streams clients will never send the topology when they know > they > > > are > > > > >> > > just rejoining the group, e.g. because the client was fenced > and > > > > >> > > dropped out of the group. That means, a topology update is > only > > > > >> > > initiated when a streams application is first started up. This > > > will > > > > >> > > prevent "accidental" topology downgrades if we know that we > are > > > > >> > > rejoining the group. However, if we rejoin the group without > > > knowing > > > > >> > > that we are just rejoining (e.g. since the whole pod was > > > restarted), > > > > >> > > we could still run into temporary topology downgrades. > > > > >> > > > > > > >> > > Proposal 2) Disable topology updates by default on the client: > > > > >> > > Second, we can disable topology updates by default from the > client > > > > >> > > side, by extending the streams configuration with a config > > > > >> > > enable_topology_updates which defaults to false and extending > the > > > > >> > > heartbeat RPC by a flag UpdateTopology which propagates that > > > config > > > > >> > > value to the broker. The advantage of having a client-side > config > > > is, > > > > >> > > that, during a roll, I can set the configuration only for > updated > > > > >> > > clients, which can ensure that no accidental rollbacks > happen. The > > > > >> > > downside is that after the roll, I would have to do a second > roll > > > to > > > > >> > > disable enable_topology_updates again, so that we are safe > during > > > the > > > > >> > > next roll again. However, avoiding the second roll is more of > a > > > > >> > > usability optimization, which is conceptually more complex and > > > could > > > > >> > > address in a follow-up KIP, as in proposal 3. > > > > >> > > > > > > >> > > Proposal 3) Fixing the topology ID during upgrades (follow-up > > > KIP): To > > > > >> > > fully prevent accidental downgrades, one could introduce a > > > broker-side > > > > >> > > group-level config that restricts the possible topology IDs > that a > > > > >> > > client can upgrade to. This would be quite similar in safety > as > > > making > > > > >> > > topology updates manual. For updating a topology, the user > would > > > have > > > > >> > > to first, extract the topology ID from the update topology - > this > > > > >> > > could e.g. be logged when the application is started, second, > set > > > the > > > > >> > > group-level config using an admin client (something like > > > > >> > > group.streams.allowed_upgrade_topology_id), third, roll the > > > > >> > > application. This would require quite some extra tooling (how > to > > > get > > > > >> > > the topology ID), so I’d propose introducing this in a > follow-up > > > KIP. > > > > >> > > > > > > >> > > Let me know if there are any concerns with this approach. > > > > >> > > > > > > >> > > Cheers, > > > > >> > > Lucas > > > > >> > > > > > > >> > > On Mon, Oct 7, 2024 at 9:42 AM Bruno Cadonna < > cado...@apache.org> > > > > >> wrote: > > > > >> > >> > > > > >> > >> Hi all, > > > > >> > >> > > > > >> > >> we did some major changes to this KIP that we would like the > > > > >> community > > > > >> > >> to review. The changes are the following: > > > > >> > >> > > > > >> > >> 1. We merged the Streams group initialize RPC into the > Streams > > > group > > > > >> > >> heartbeat RPC. We decided to merge them because it > simplifies the > > > > >> > >> sychronization between initialization and heartbeat during > group > > > > >> > >> creation and migration from a classic group. A consequence > of the > > > > >> merge > > > > >> > >> is that the heartbeat now needs permissions to create and > > > describe > > > > >> > >> topics which before only the initialize call had. But we > think > > > that > > > > >> is > > > > >> > >> not a big deal. A consumer of a Streams client will send the > > > > >> metadata of > > > > >> > >> its topology in its first heartbeat when it joins the group. > > > After > > > > >> that > > > > >> > >> it will not send the topology metadata anymore. > > > > >> > >> > > > > >> > >> 2. We discovered that to create the internal topics, the > topology > > > > >> > >> metadata also need to include the co-partition groups since > we > > > cannot > > > > >> > >> infer the co-partitioning solely from the sub-topologies and > the > > > > >> input > > > > >> > >> topics of the sub-topologies. > > > > >> > >> > > > > >> > >> 3. We added a section that describes how the migration from a > > > classic > > > > >> > >> group to a streams group and back works. This is very > similar to > > > how > > > > >> the > > > > >> > >> migration in KIP-848 works. > > > > >> > >> > > > > >> > >> > > > > >> > >> Please give us feedback about the changes and the KIP in > general. > > > > >> > >> > > > > >> > >> We would like to start a vote on this KIP as soon as > possible. > > > > >> > >> > > > > >> > >> Best, > > > > >> > >> Bruno > > > > >> > >> > > > > >> > >> On 11.09.24 01:06, Sophie Blee-Goldman wrote: > > > > >> > >>> Just following up to say thank you Lucas for the detailed > > > > >> > explanations, and > > > > >> > >>> especially the thorough response to my more "existential" > > > questions > > > > >> > about > > > > >> > >>> building off 848 vs introducing a separate Streams group > > > protocol. > > > > >> This > > > > >> > >>> really helped me understand the motivations behind this > > > decision. No > > > > >> > notes! > > > > >> > >>> > > > > >> > >>> On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy > > > > >> > >>> <lbruts...@confluent.io.invalid> wrote: > > > > >> > >>> > > > > >> > >>>> Hi Sophie, > > > > >> > >>>> > > > > >> > >>>> thanks for getting deeply involved in this discussion. > > > > >> > >>>> > > > > >> > >>>> S16. Adding tagged fields would not require an RPC version > > > bump at > > > > >> > >>>> all. This should already a cover a lot of use cases where > > > > >> requestion > > > > >> > >>>> versioning wouldn't become an issue, as long as it is safe > to > > > > >> ignore > > > > >> > >>>> the new fields. Other than that, I'm not aware of a formal > > > > >> definition > > > > >> > >>>> of the what request version bumps mean in relation to > Apache > > > Kafka > > > > >> > >>>> versions - if anybody knows, please let me know. In my > > > > >> understanding, > > > > >> > >>>> the Kafka protocol evolves independently of AK versions, > and > > > for > > > > >> the > > > > >> > >>>> Kafka protocol to change, you require an accepted KIP, and > not > > > an > > > > >> AK > > > > >> > >>>> release. The grey area is that a protocol change without > an AK > > > > >> release > > > > >> > >>>> does not mean much, because even if cloud vendors deploy it > > > before > > > > >> an > > > > >> > >>>> AK release, virtually no clients would support it and the > > > protocol > > > > >> > >>>> would fall back to the earlier RPC version. So maybe this > isn't > > > > >> that > > > > >> > >>>> clearly formalized because it does not matter much in > practice. > > > > >> > >>>> Normally, I would expect an RPC version bump after each KIP > > > that > > > > >> > >>>> breaks RPC compatibility. But there seems to be some > > > flexibility > > > > >> here > > > > >> > >>>> - for example, the KIP-848 RPCs were changed after the > fact in > > > > >> trunk, > > > > >> > >>>> because they were not exposed yet by default in AK (or any > > > other > > > > >> > >>>> implementation of the Kafka protocol), so a breaking change > > > without > > > > >> > >>>> request version bump was probably not deemed problematic. > > > > >> > >>>> > > > > >> > >>>> S17/18. We can add an informational table for the > interface to > > > the > > > > >> > >>>> assignor, and that should also make clear how a client-side > > > > >> assignment > > > > >> > >>>> interface would look. Generally, our idea would be to send > all > > > the > > > > >> > >>>> input information for the task assignor to a client and > expect > > > all > > > > >> the > > > > >> > >>>> output information of a task assignor in return, very > similar > > > to > > > > >> > >>>> KIP-848. We did not include such a table because we do not > > > define a > > > > >> > >>>> public interface to define custom broker-side assignors in > this > > > > >> KIP. > > > > >> > >>>> But generally, the input / output of the (task) assignor > mostly > > > > >> > >>>> follows the data fields sent to / from the brokers. Let me > do > > > that, > > > > >> > >>>> but first I'll respond to the rest of your questions that > seem > > > more > > > > >> > >>>> important. > > > > >> > >>>> > > > > >> > >>>> S19. I think an explicit trigger for rebalance could make > > > sense, > > > > >> but > > > > >> > >>>> let me explain the thinking why we haven't included this > in the > > > > >> > >>>> current KIP: We don't want to include it for client-side > > > assignment > > > > >> > >>>> obviously, because this KIP does not include client-side > > > > >> assignment. > > > > >> > >>>> For triggering a rebalance when a warm-up has caught up, > this > > > is > > > > >> > >>>> treated differently in the KIP: the task assignor is called > > > when a > > > > >> > >>>> client sends updated task offsets. The client does not > update > > > its > > > > >> task > > > > >> > >>>> offsets regularly, but according to an interval > > > > >> > >>>> `task.offset.interval.ms`, but also immediately when a > client > > > > >> notices > > > > >> > >>>> that a warm-up has caught up. So effectively, it is mostly > the > > > > >> client > > > > >> > >>>> that is constantly checking the lag. I absolutely believe > that > > > the > > > > >> > >>>> client should be checking for `acceptable.recovery.lag`, > and > > > we are > > > > >> > >>>> using the presence of the task offset field to trigger > > > > >> reassignment. > > > > >> > >>>> What we weren't sure is how often we need the task offsets > to > > > be > > > > >> > >>>> updated and how often we want to retrigger the assignment > > > because > > > > >> of > > > > >> > >>>> that. An explicit field for "forcing" a reassignment may > still > > > make > > > > >> > >>>> sense, so that we can update the task offsets on the broker > > > more > > > > >> often > > > > >> > >>>> than we call the assignor. However, I'm not convinced that > this > > > > >> should > > > > >> > >>>> just be a generic "rebalance trigger", as this seems to be > > > going > > > > >> > >>>> against the goals of KIP-848. Generally, I think one of the > > > > >> principles > > > > >> > >>>> of KIP-848 is that a member just describes its current > state > > > and > > > > >> > >>>> metadata, and the group coordinator makes a decision on > whether > > > > >> > >>>> reassignment needs to be triggered. So it would seem more > > > natural > > > > >> to > > > > >> > >>>> just indicate a boolean "I have caught-up warm-up tasks" > to the > > > > >> group > > > > >> > >>>> coordinator. I understand that for client-side assignment, > you > > > may > > > > >> > >>>> want to trigger assignment explicitely without the group > > > > >> coordinator > > > > >> > >>>> understanding why, but then we should add such a field as > part > > > of > > > > >> the > > > > >> > >>>> client-side assignment KIP. > > > > >> > >>>> > > > > >> > >>>> S20. Yes, inside the group coordinator, the main > differences > > > are > > > > >> > >>>> topology initialization handling, and the semantics of task > > > > >> assignment > > > > >> > >>>> vs. partition assignment. It will not be a completely > separate > > > > >> group > > > > >> > >>>> coordinator, and we will not change things like the offset > > > reading > > > > >> and > > > > >> > >>>> writing. What will have its own code path is the handling > of > > > > >> > >>>> heartbeats and the reconciliation of the group (so, which > > > tasks are > > > > >> > >>>> sent to which member at which point in time until we reach > the > > > > >> > >>>> declarative target assignment that was produced by a > > > client-side or > > > > >> > >>>> server-side assignor). Also, the assignor implementations > will > > > > >> > >>>> obviously be completely separate from the consumer group > > > assignors, > > > > >> > >>>> but that is already the case currently. Streams groups will > > > > >> co-exist > > > > >> > >>>> in the group coordinator with share groups (for queues), > > > classic > > > > >> > >>>> groups (old generic protocol for consumers and connect) and > > > > >> consumer > > > > >> > >>>> groups (from KIP-848), and potentially a new group type for > > > > >> connect as > > > > >> > >>>> well. Each of these group types have a separate code path > for > > > heart > > > > >> > >>>> beating, reconciling assignments, keeping track of the > internal > > > > >> state > > > > >> > >>>> of the group and responding to specific requests from admin > > > API, > > > > >> like > > > > >> > >>>> ConsumerGroupDescribe. For streams, there will be a certain > > > amount > > > > >> of > > > > >> > >>>> duplication from consumer groups - for example, liveness > > > tracking > > > > >> for > > > > >> > >>>> members (setting timers for the session timeout etc.) is > > > > >> essentially > > > > >> > >>>> the same between both groups, and the reconciliation logic > is > > > quite > > > > >> > >>>> similar. We will make an effort to refactor as much of this > > > code > > > > >> in a > > > > >> > >>>> way that it can be reused by both group types, and a lot of > > > things > > > > >> > >>>> have already been made generic with the introduction of > share > > > > >> groups > > > > >> > >>>> in the group coordinator. > > > > >> > >>>> > > > > >> > >>>> S21. It's nice that we are discussing "why are we doing > this" > > > in > > > > >> > >>>> question 21 :D. I think we are touching upon the reasoning > in > > > the > > > > >> KIP, > > > > >> > >>>> but let me explain the reasoning in more detail. You are > > > right, the > > > > >> > >>>> reason we want to implement custom RPCs for Kafka Streams > is > > > not > > > > >> about > > > > >> > >>>> broker-side vs. client-side assignment. The broker could > > > implement > > > > >> the > > > > >> > >>>> Member Metadata versioning scheme of Kafka Streams, > intercept > > > > >> consumer > > > > >> > >>>> group heartbeats that look like Kafka Streams heartbeats, > > > > >> deserialize > > > > >> > >>>> the custom metadata and run a broker-side assignor for > Kafka > > > > >> Streams, > > > > >> > >>>> and finally serialize custom Kafka Streams assignment > > > metadata, all > > > > >> > >>>> based on KIP-848 RPCs. > > > > >> > >>>> > > > > >> > >>>> First of all, the main difference we see with custom RPCs > is > > > really > > > > >> > >>>> that streams groups become a first-level concept for both > the > > > > >> broker > > > > >> > >>>> and clients of the Kafka. This also goes into the answer > for > > > S20 > > > > >> (what > > > > >> > >>>> is different beside topology initialization and task > > > > >> reconciliation). > > > > >> > >>>> With streams groups, I can use the Admin API call to see > the > > > > >> current > > > > >> > >>>> assignment, the target assignment, all the metadata > necessary > > > for > > > > >> > >>>> assignment, and some information about the current > topology in > > > a > > > > >> > >>>> human-readable and machine-readable form, instead of > having to > > > > >> parse > > > > >> > >>>> it from logs on some client machine that may or may not > have > > > the > > > > >> right > > > > >> > >>>> log-level configured. This will help humans to understand > what > > > is > > > > >> > >>>> going on, but can also power automated tools / CLIs / UIs, > and > > > I > > > > >> think > > > > >> > >>>> this is a major improvement for operating a streams > > > application. > > > > >> > >>>> Again, this is not about who is in control of the broker > or the > > > > >> > >>>> clients, the information becomes accessible for everybody > who > > > can > > > > >> call > > > > >> > >>>> the Admin API. It also does not depend on the cloud > vendor, it > > > > >> will be > > > > >> > >>>> an improvement available for pure AK users, and will also > work > > > with > > > > >> > >>>> client-side assignment (although obviously, if there is an > > > > >> intention > > > > >> > >>>> to start encoding proprietary data in black-box > byte-arrays, > > > this > > > > >> > >>>> information remains hidden to users of the open source > > > distribution > > > > >> > >>>> again). > > > > >> > >>>> > > > > >> > >>>> I also think that the original proposal of reusing the new > > > consumer > > > > >> > >>>> group protocol for Kafka Streams has some downsides, that, > > > while > > > > >> not > > > > >> > >>>> being deal-breakers, would make the new behaviour of Kafka > > > Streams > > > > >> > >>>> less than ideal. One example is the reconciliation: if we > have > > > a > > > > >> > >>>> custom stream partition assignor (client- or broker-side > > > doesn't > > > > >> > >>>> matter), we can only control the target assignment that the > > > group > > > > >> > >>>> converges to, but none of the intermediate states that the > > > group > > > > >> > >>>> transitions through while attempting to reach the desired > end > > > > >> state. > > > > >> > >>>> For example, there is no way to prevent the reconciliation > > > loop of > > > > >> > >>>> KIP-848 from giving an active task to processX before > another > > > > >> instance > > > > >> > >>>> on processX has revoked the corresponding standby task. > There > > > is no > > > > >> > >>>> synchronization point where these movements are done at the > > > same > > > > >> time > > > > >> > >>>> as in the classic protocol, and on the broker we can't > prevent > > > > >> this, > > > > >> > >>>> because the reconciliation loop in KIP-848 does not know > what a > > > > >> > >>>> standby task is, or what a process ID is. You could argue, > you > > > can > > > > >> let > > > > >> > >>>> the application run into LockExceptions in the meantime, > but it > > > > >> will > > > > >> > >>>> be one of the things that will make Kafka Streams work less > > > than > > > > >> > >>>> ideal. Another example is task offsets. It is beneficial > for > > > > >> streams > > > > >> > >>>> partition assignors to have relatively recent task offset > > > > >> information, > > > > >> > >>>> which, in KIP-848, needs to be sent to the assignor using a > > > > >> black-box > > > > >> > >>>> MemberMetadata byte array. This member metadata byte array > is > > > > >> > >>>> persisted to the consumer offset topic every time it > changes > > > (for > > > > >> > >>>> failover). So we end up with a bad trade-off: if we send > the > > > task > > > > >> > >>>> offset information to the broker very frequently, we will > have > > > to > > > > >> > >>>> write those member metadata records very frequently as > well. > > > If we > > > > >> do > > > > >> > >>>> not send them frequently, streams task assignors will have > very > > > > >> > >>>> outdated lag information. In KIP-1071 we can make the > decision > > > to > > > > >> not > > > > >> > >>>> persist the task offset information, which avoids this > > > trade-off. > > > > >> It > > > > >> > >>>> gives us up-to-date lag information in the usual case, but > > > after > > > > >> > >>>> fail-over of the group coordinator, our lag information > may be > > > > >> missing > > > > >> > >>>> for a short period of time. These are two examples, but I > > > think the > > > > >> > >>>> general theme is, that with custom RPCs, Kafka Streams is > in > > > > >> control > > > > >> > >>>> of how the group metadata is processed and how the tasks > are > > > > >> > >>>> reconciled. This is not just a consideration for now, but > also > > > the > > > > >> > >>>> future: will KIP-848 reconciliation and metadata management > > > always > > > > >> > >>>> work "good enough" for Kafka Streams, or will there be use > > > cases > > > > >> where > > > > >> > >>>> we want to evolve and customize the behaviour of Kafka > Streams > > > > >> groups > > > > >> > >>>> independently of consumer groups. > > > > >> > >>>> > > > > >> > >>>> A final point that I would like to make, I don't think that > > > custom > > > > >> > >>>> streams RPCs are inherently a lot harder to do for the > Kafka > > > > >> Streams > > > > >> > >>>> community than "just utilizing" KIP-848, at least not as > much > > > as > > > > >> you > > > > >> > >>>> would think. Many things in KIP-848, such customizable > > > assignment > > > > >> > >>>> metadata with a metadata versioning scheme are made to look > > > like > > > > >> > >>>> general-purpose mechanisms, but are effectively mechanisms > for > > > > >> Kafka > > > > >> > >>>> Streams and also completely unimplemented at this point in > any > > > > >> > >>>> implementation of the Kafka protocol. We know that there is > > > very > > > > >> > >>>> little use of custom partition assignors in Kafka besides > Kafka > > > > >> > >>>> Streams, and we can expect the same to be true of these new > > > > >> > >>>> "customizable" features of KIP-848. So, besides the Kafka > > > Streams > > > > >> > >>>> community, there is no one driving the implementation of > these > > > > >> > >>>> features. "Just utilizing" KIP-848 for Kafka Streams would > > > > >> > >>>> effectively mean to implement all this, and still make a > lot of > > > > >> deep > > > > >> > >>>> changes to Kafka Streams (as mechanisms like probing > rebalances > > > > >> etc. > > > > >> > >>>> don't make sense in KIP-848). I think adapting KIP-848 to > work > > > with > > > > >> > >>>> Kafka Streams either way is a big effort. Piggybacking on > > > consumer > > > > >> > >>>> group RPCs and extending the consumer group protocol to the > > > point > > > > >> that > > > > >> > >>>> it can carry Kafka Streams on its back would not make > things > > > > >> > >>>> significantly easier. Some things, like online migration > from > > > the > > > > >> old > > > > >> > >>>> protocol to the new protocol or reimplementing metadata > > > versioning > > > > >> on > > > > >> > >>>> top of KIP-848, may even be harder in KIP-848, than in > > > KIP-1071. I > > > > >> > >>>> don't think we can consider consumer group RPCs as a > shortcut > > > to > > > > >> > >>>> revamp the streams rebalance protocol, but should be > focussing > > > on > > > > >> what > > > > >> > >>>> makes Kafka Streams work best down the road. > > > > >> > >>>> > > > > >> > >>>> Hope that makes sense! > > > > >> > >>>> > > > > >> > >>>> Cheers, > > > > >> > >>>> Lucas > > > > >> > >>>> > > > > >> > >>>> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman > > > > >> > >>>> <sop...@responsive.dev> wrote: > > > > >> > >>>>> > > > > >> > >>>>> Thanks for another detailed response Lucas! Especially > w.r.t > > > how > > > > >> the > > > > >> > >>>> epochs > > > > >> > >>>>> are defined. I also went back and re-read KIP-848 to > refresh > > > > >> myself, > > > > >> > I > > > > >> > >>>>> guess it wasn't clear to me how much of the next-gen > consumer > > > > >> > protocol we > > > > >> > >>>>> are reusing vs what's being built from the ground up. > > > > >> > >>>>> > > > > >> > >>>>> S16. > > > > >> > >>>>> > > > > >> > >>>>>> It will become interesting when we want to include > extra > > > data > > > > >> > after > > > > >> > >>>> the > > > > >> > >>>>>> initial protocol definition. > > > > >> > >>>>> > > > > >> > >>>>> I guess that's what I'm asking us to define -- how do we > > > evolve > > > > >> the > > > > >> > >>>> schema > > > > >> > >>>>> when expanding the protocol? I think we need to set a few > > > public > > > > >> > >>>> contracts > > > > >> > >>>>> here, such as whether/which versions get bumped when. For > > > example > > > > >> if > > > > >> > I > > > > >> > >>>>> add/modify fields twice within a single Kafka release, do > we > > > still > > > > >> > need > > > > >> > >>>> to > > > > >> > >>>>> update the version? I think the answer is "yes" since I > know > > > you > > > > >> > >>>> Confluent > > > > >> > >>>>> folks upgrade brokers more often than AK releases, but > this is > > > > >> > different > > > > >> > >>>>> from how eg the SubscriptionInfo/AssignmentInfo of the > > > > >> > >>>>> StreamsPartitionAssignor works today, hence I want to put > all > > > > >> that in > > > > >> > >>>>> writing (for those of us who aren't familiar with how all > this > > > > >> works > > > > >> > >>>>> already and may be modifying it in the future, say, to add > > > > >> > client-side > > > > >> > >>>>> assignment :P) > > > > >> > >>>>> > > > > >> > >>>>> Maybe a quick example of what to do to evolve this schema > is > > > > >> > sufficient? > > > > >> > >>>>> > > > > >> > >>>>> S17. > > > > >> > >>>>> The KIP seems to throw the assignor, group coordinator, > topic > > > > >> > >>>>> initialization, and topology versioning all into a single > > > black > > > > >> box > > > > >> > on > > > > >> > >>>> the > > > > >> > >>>>> broker. I understand much of this is intentionally being > > > glossed > > > > >> > over as > > > > >> > >>>> an > > > > >> > >>>>> implementation detail since it's a KIP, but it would > really > > > help > > > > >> to > > > > >> > tease > > > > >> > >>>>> apart these distinct players and define their > interactions. > > > For > > > > >> > example, > > > > >> > >>>>> what are the inputs and outputs to the assignor? For > example > > > > >> could we > > > > >> > >>>> get a > > > > >> > >>>>> table like the "Data Model > > > > >> > >>>>> < > > > > >> > >>>> > > > > >> > > > > > >> > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel > > > > >> > >>>>> " > > > > >> > >>>>> in 848? > > > > >> > >>>>> > > > > >> > >>>>> KIP-848 goes into detail on these things, but again, it's > > > unclear > > > > >> > what > > > > >> > >>>>> parts of 848 are and aren't supposed to be carried > through. > > > For > > > > >> > example > > > > >> > >>>> 848 > > > > >> > >>>>> has a concept of assignor "reasons" and client-side > assignors > > > and > > > > >> > >>>> assignor > > > > >> > >>>>> metadata which can all trigger a rebalance/group epoch > bump. > > > > >> However > > > > >> > in > > > > >> > >>>> the > > > > >> > >>>>> current plan for 1071 none of these exist. Which leads me > to > > > my > > > > >> next > > > > >> > >>>>> point... > > > > >> > >>>>> > > > > >> > >>>>> S18. > > > > >> > >>>>> Many people use a managed Kafka service like Confluent > Cloud > > > where > > > > >> > >>>> brokers > > > > >> > >>>>> are on the bleeding edge but clients are upgraded less > > > frequently. > > > > >> > >>>> However, > > > > >> > >>>>> the opposite is also true for a great many users: client > > > > >> > applications are > > > > >> > >>>>> easy to upgrade and bumped often to get new features, > whereas > > > the > > > > >> > brokers > > > > >> > >>>>> (often run by a different team altogether) are considered > > > riskier > > > > >> to > > > > >> > >>>>> upgrade and stay on one version for a long time. We can > argue > > > > >> about > > > > >> > which > > > > >> > >>>>> case is more common -- in my personal experience it's > > > actually the > > > > >> > >>>> latter, > > > > >> > >>>>> although I only realized this after my time at Confluent > > > which was > > > > >> > >>>>> obviously skewed towards meeting Confluent Cloud users -- > but > > > > >> > regardless, > > > > >> > >>>>> we obviously need to support both cases as first-class > > > citizens. > > > > >> And > > > > >> > I'm > > > > >> > >>>> a > > > > >> > >>>>> bit concerned that there's not enough in the KIP to ensure > > > that > > > > >> > >>>> client-side > > > > >> > >>>>> assignment won't end up getting hacked into the protocol > as an > > > > >> > >>>> afterthought. > > > > >> > >>>>> > > > > >> > >>>>> Don't get me wrong, I'm still not demanding you guys > implement > > > > >> > >>>> client-side > > > > >> > >>>>> assignors as part of this KIP. I'm happy to leave the > > > > >> implementation > > > > >> > of > > > > >> > >>>>> that to a followup, but I feel we need to flesh out the > basic > > > > >> > direction > > > > >> > >>>> for > > > > >> > >>>>> this right now, as part of the v0 Streams protocol. > > > > >> > >>>>> > > > > >> > >>>>> I'm also not asking you to do this alone, and am happy to > work > > > > >> > with/for > > > > >> > >>>> you > > > > >> > >>>>> on this. I have one possible proposal sketched out > already, > > > > >> although > > > > >> > >>>> before > > > > >> > >>>>> I share that I just wanted to make sure we're aligned on > the > > > > >> > fundamentals > > > > >> > >>>>> and have mapped out the protocol in full first (ie S17). > Just > > > > >> want to > > > > >> > >>>> have > > > > >> > >>>>> a rough plan for how clients-side assignment will fit > into the > > > > >> > picture, > > > > >> > >>>> and > > > > >> > >>>>> right now it's difficult to define because it's unclear > where > > > the > > > > >> > >>>>> boundaries between the group coordinator and broker-side > > > assignor > > > > >> > are (eg > > > > >> > >>>>> what are assignor inputs and outputs). > > > > >> > >>>>> > > > > >> > >>>>> I'm committed to ensuring this doesn't add extra work or > > > delay to > > > > >> > this > > > > >> > >>>> KIP > > > > >> > >>>>> for you guys. > > > > >> > >>>>> > > > > >> > >>>>> S19. > > > > >> > >>>>> I think it could make sense to include a concept like the > > > > >> "assignor > > > > >> > >>>>> reason" from 848 as a top-level field and rebalance > trigger > > > (ie > > > > >> > group > > > > >> > >>>>> epoch bump). This would mainly be for the client-side > > > assignor to > > > > >> > >>>> trigger a > > > > >> > >>>>> rebalance, but for another example, it seems like we want > the > > > > >> > brokers to > > > > >> > >>>> be > > > > >> > >>>>> constantly monitoring warmup tasks and processing lag > updates > > > to > > > > >> know > > > > >> > >>>> when > > > > >> > >>>>> a warmup task is to be switched, which is heavy work. Why > not > > > just > > > > >> > let > > > > >> > >>>> the > > > > >> > >>>>> client who owns the warmup and knows the lag decide when > it > > > has > > > > >> > warmed up > > > > >> > >>>>> and signal the broker? It's a small thing for a client to > > > check > > > > >> when > > > > >> > its > > > > >> > >>>>> task gets within the acceptable recovery lag and notify > via > > > the > > > > >> > heartbeat > > > > >> > >>>>> if so, but it seems like a rather big deal for the broker > to > > > be > > > > >> > >>>> constantly > > > > >> > >>>>> computing these checks for every task on every hb. Just > food > > > for > > > > >> > thought, > > > > >> > >>>>> I don't really have a strong preference over where the > warmup > > > > >> > decision > > > > >> > >>>>> happens -- simply pointing it out that it fits very neatly > > > into > > > > >> the > > > > >> > >>>>> "assignor reason"/"rebalance requested" framework. Making > > > this a > > > > >> > >>>>> client-side decision also makes it easier to evolve and > > > customize > > > > >> > since > > > > >> > >>>>> Kafka Streams users tend to have more control over > > > > >> > upgrading/implementing > > > > >> > >>>>> stuff in their client application vs the brokers (which is > > > often a > > > > >> > >>>>> different team altogether) > > > > >> > >>>>> > > > > >> > >>>>> S20. > > > > >> > >>>>> To what extent do we plan to reuse implementations of 848 > in > > > > >> order to > > > > >> > >>>> build > > > > >> > >>>>> the new Streams group protocol? I'm still having some > trouble > > > > >> > >>>> understanding > > > > >> > >>>>> where we are just adding stuff on top and where we plan to > > > rebuild > > > > >> > from > > > > >> > >>>> the > > > > >> > >>>>> ground up. For example we seem to share most of the epoch > > > > >> > reconciliation > > > > >> > >>>>> stuff but will have different triggers for the group epoch > > > bump. > > > > >> Is > > > > >> > it > > > > >> > >>>>> possible to just "plug in" the group epoch and let the > > > consumer's > > > > >> > group > > > > >> > >>>>> coordinator figure stuff out from there? Will we have a > > > completely > > > > >> > >>>> distinct > > > > >> > >>>>> Streams coordinator? I know this is an implementation > detail > > > but > > > > >> imo > > > > >> > it's > > > > >> > >>>>> ok to get in the weeds a bit for a large and complex KIP > such > > > as > > > > >> > this. > > > > >> > >>>>> > > > > >> > >>>>> Setting aside the topology initialization stuff, is the > main > > > > >> > difference > > > > >> > >>>>> really just in how the group coordinator will be aware of > > > tasks > > > > >> for > > > > >> > >>>> Streams > > > > >> > >>>>> groups vs partitions for consumer groups? > > > > >> > >>>>> > > > > >> > >>>>> Which brings me to a high level question I feel I have to > ask: > > > > >> > >>>>> > > > > >> > >>>>> S21. > > > > >> > >>>>> Can you go into more detail on why we have to provide > Kafka > > > > >> Streams > > > > >> > its > > > > >> > >>>> own > > > > >> > >>>>> RPCs as opposed to just utilizing the protocol from > KIP-848? > > > The > > > > >> > Rejected > > > > >> > >>>>> Alternatives does address this but the main argument there > > > seems > > > > >> to > > > > >> > be in > > > > >> > >>>>> implementing a broker-side assignor. But can't you do this > > > with > > > > >> 848? > > > > >> > And > > > > >> > >>>>> given there are users who rarely upgrade their brokers > just as > > > > >> there > > > > >> > are > > > > >> > >>>>> users who rarely upgrade their clients, it seems silly to > do > > > all > > > > >> this > > > > >> > >>>> work > > > > >> > >>>>> and change the fundamental direction of Kafka Streams > just to > > > give > > > > >> > more > > > > >> > >>>>> control over the assignment to the brokers. > > > > >> > >>>>> > > > > >> > >>>>> For a specific example, the KIP says "Client-side > assignment > > > > >> makes it > > > > >> > >>>> hard > > > > >> > >>>>> to debug and tune the assignment logic" -- personally, I > would > > > > >> find > > > > >> > it > > > > >> > >>>>> infinitely more difficult to tune and debug assignment > logic > > > if it > > > > >> > were > > > > >> > >>>>> happening on the broker. But that's just because I work > with > > > the > > > > >> app > > > > >> > >>>> devs, > > > > >> > >>>>> not the cluster operators. The point is we shouldn't > design an > > > > >> entire > > > > >> > >>>>> protocol to give preference to one vendor or another -- > nobody > > > > >> wants > > > > >> > the > > > > >> > >>>>> ASF to start yelling at us lol D: > > > > >> > >>>>> > > > > >> > >>>>> Another example, which imo is a good one (IIUC) is that "a > > > simple > > > > >> > >>>> parameter > > > > >> > >>>>> change requires redeploying of all streams clients". > That's > > > > >> > definitely a > > > > >> > >>>>> huge bummer, but again, why would broker-side assignors > not be > > > > >> able > > > > >> > to > > > > >> > >>>>> trigger a reassignment based on a custom config in the 848 > > > > >> protocol? > > > > >> > We > > > > >> > >>>>> could still have the ha and sticky assignors implemented > on > > > the > > > > >> > brokers, > > > > >> > >>>>> and could still define all the Streams configs as broker > > > configs > > > > >> to > > > > >> > be > > > > >> > >>>>> passed into the streams custom broker assignors. That > feels > > > like a > > > > >> > lot > > > > >> > >>>> less > > > > >> > >>>>> work than redoing our own group protocol from scratch? > > > > >> > >>>>> > > > > >> > >>>>> I don't want you guys to think I'm trying to block the > entire > > > > >> > direction > > > > >> > >>>> of > > > > >> > >>>>> this KIP. I'd just like to better understand your > thoughts on > > > this > > > > >> > since > > > > >> > >>>> I > > > > >> > >>>>> assume you've discussed this at length internally. Just > help > > > me > > > > >> > >>>> understand > > > > >> > >>>>> why this huge proposal is justified (for my own sake and > to > > > > >> convince > > > > >> > any > > > > >> > >>>>> "others" who might be skeptical) > > > > >> > >>>>> > > > > >> > >>>>> > > > > >> > >>>>> On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax < > > > mj...@apache.org> > > > > >> > wrote: > > > > >> > >>>>> > > > > >> > >>>>>> I still need to catch up on the discussion in general. > But as > > > > >> > promised, > > > > >> > >>>>>> I just started KIP-1088 about the `KafkaClientSupplier` > > > question. > > > > >> > >>>>>> > > > > >> > >>>>>> Looking forward to your feedback on the new KIP. I hope > we > > > can > > > > >> get > > > > >> > >>>>>> KIP-1088 done soon, to not block this KIP. > > > > >> > >>>>>> > > > > >> > >>>>>> > > > > >> > >>>>>> -Matthias > > > > >> > >>>>>> > > > > >> > >>>>>> On 9/4/24 09:52, Lucas Brutschy wrote: > > > > >> > >>>>>>> Hi Sophie, > > > > >> > >>>>>>> > > > > >> > >>>>>>> thanks for the questions and comments! > > > > >> > >>>>>>> > > > > >> > >>>>>>> S1. KIP-1071 does not need to add public interfaces to > > > streams, > > > > >> but > > > > >> > >>>> it > > > > >> > >>>>>>> does not (yet) make the streams protocol client > pluggable. > > > > >> Matthias > > > > >> > >>>>>>> promised to write a KIP that will propose a solution > for the > > > > >> > >>>>>>> KafkaClientSupplier soon. He suggested keeping it > separate > > > from > > > > >> > this > > > > >> > >>>>>>> KIP, but we can treat it as a blocker. You are right > that we > > > > >> could > > > > >> > >>>> try > > > > >> > >>>>>>> fitting an explicit topic initialization in this > interface, > > > if > > > > >> we > > > > >> > >>>>>>> decide we absolutely need it now, although I'm not sure > if > > > that > > > > >> > would > > > > >> > >>>>>>> actually define a useful workflow for the users to > > > explicitly > > > > >> > update > > > > >> > >>>> a > > > > >> > >>>>>>> topology. Explicit initialization cannot be part of > admin > > > tools, > > > > >> > >>>>>>> because we do not have the topology there. It could > > > probably be > > > > >> a > > > > >> > >>>>>>> config that defaults to off, or a method on the > KafkaStreams > > > > >> > object? > > > > >> > >>>>>>> To me, explicit initialization opens a broad discussion > on > > > how > > > > >> to > > > > >> > do > > > > >> > >>>>>>> it, so I would like to avoid including it in this KIP. I > > > guess > > > > >> it > > > > >> > >>>> also > > > > >> > >>>>>>> depends on the outcome of the discussion in S8-S10. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S2. It's hard to define the perfect default, and we > should > > > > >> probably > > > > >> > >>>>>>> aim for safe operations first - since people can always > up > > > the > > > > >> > limit. > > > > >> > >>>>>>> I changed it to 20. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S11. I was going to write we can add it, but it's > actually > > > quite > > > > >> > >>>>>>> independent of KIP-1071 and also a nice newbie ticket, > so I > > > > >> would > > > > >> > >>>>>>> propose we treat it as out-of-scope and let somebody > define > > > it > > > > >> in a > > > > >> > >>>>>>> separate KIP - any discussion about it would anyway be > lost > > > in > > > > >> this > > > > >> > >>>>>>> long discussion thread. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S12. Member epoch behaves the same as in KIP-848. See > the > > > > >> KIP-848 > > > > >> > >>>>>>> explanation for basics: > > > > >> > >>>>>>> > > > > >> > >>>>>>> > > > > >> > >>>>>> > > > > >> > >>>> > > > > >> > > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup > > > > >> > >>>>>>> > > > > >> > >>>>>>> Together with the section "Reconciliation of the group", > > > this > > > > >> > should > > > > >> > >>>>>>> make clear how member epochs, group epochs and > assignment > > > epochs > > > > >> > work > > > > >> > >>>>>>> in KIP-1071, but I'll try to summarize the > reconciliation a > > > bit > > > > >> > more > > > > >> > >>>>>>> explicitly below: > > > > >> > >>>>>>> > > > > >> > >>>>>>> Essentially, the broker drives a reconciliation for each > > > member > > > > >> > >>>>>>> separately. The goal is to reconcile the assignment > > > generated by > > > > >> > the > > > > >> > >>>>>>> assignor and reach the assignment epoch, but it's an > > > > >> asynchronous > > > > >> > >>>>>>> process, so each member may still be on an older epoch, > > > which is > > > > >> > the > > > > >> > >>>>>>> member epoch. We do not reinvent any of the internal > > > machinery > > > > >> for > > > > >> > >>>>>>> streams here, we are just adding standby tasks and > warm-up > > > > >> tasks to > > > > >> > >>>>>>> the reconciliation of KIP-848. > > > > >> > >>>>>>> > > > > >> > >>>>>>> During reconciliation, the broker first asks the member > to > > > > >> revoke > > > > >> > all > > > > >> > >>>>>>> tasks that are not in its target assignment, by > removing it > > > from > > > > >> > the > > > > >> > >>>>>>> current assignment for the member (which is provided in > the > > > > >> > heartbeat > > > > >> > >>>>>>> response). Until revocation is completed, the member > > > remains on > > > > >> the > > > > >> > >>>>>>> old member epoch and in `UNREVOKED` state. Only once the > > > member > > > > >> > >>>>>>> confirms revocation of these tasks by removing them > from the > > > > >> set of > > > > >> > >>>>>>> owned tasks in the heartbeat request, the member can > > > transition > > > > >> to > > > > >> > >>>> the > > > > >> > >>>>>>> current assignment epoch and get new tasks assigned. > Once > > > the > > > > >> > member > > > > >> > >>>>>>> has revoked all tasks that are not in its target > > > assignment, it > > > > >> > >>>>>>> transitions to the new epoch and gets new tasks > assigned. > > > There > > > > >> are > > > > >> > >>>>>>> some tasks that may be in the (declarative) target > > > assignment of > > > > >> > the > > > > >> > >>>>>>> member generated by the assignor, but that may not be > > > > >> immediately > > > > >> > >>>>>>> added to the current assignment of the member, because > they > > > are > > > > >> > still > > > > >> > >>>>>>> "unreleased". Unreleased tasks are: > > > > >> > >>>>>>> > > > > >> > >>>>>>> - A. For any task, another instance of the same process > may > > > own > > > > >> the > > > > >> > >>>>>>> task in any role (active/standby/warm-up), and we are > still > > > > >> > awaiting > > > > >> > >>>>>>> revocation. > > > > >> > >>>>>>> > > > > >> > >>>>>>> - B. For an active task, an instance of any process may > own > > > the > > > > >> > same > > > > >> > >>>>>>> active task, and we are still awaiting revocation. > > > > >> > >>>>>>> > > > > >> > >>>>>>> As long as there is at least one unreleased task, the > member > > > > >> > remains > > > > >> > >>>>>>> in state UNRELEASED. Once all tasks of the target > > > assignment are > > > > >> > >>>> added > > > > >> > >>>>>>> to the current assignment of the member (because they > were > > > > >> > released), > > > > >> > >>>>>>> the member transitions to STABLE. > > > > >> > >>>>>>> > > > > >> > >>>>>>> Technically, the first constraint (we cannot assign a > task > > > in > > > > >> two > > > > >> > >>>>>>> different roles to the same process) is only relevant > for > > > > >> stateful > > > > >> > >>>>>>> tasks that use the local state directory. I wonder if we > > > should > > > > >> > relax > > > > >> > >>>>>>> this restriction slightly, by marking sub-topologies > that > > > access > > > > >> > the > > > > >> > >>>>>>> local state directory. This information could also be > used > > > by > > > > >> the > > > > >> > >>>>>>> assignor. But we can also introduce as a follow-up > > > improvement. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S13i. You are right, these bumps of the group epoch are > "on > > > > >> top" of > > > > >> > >>>>>>> the KIP-848 reasons for bumping the group epoch. I > added the > > > > >> > KIP-848 > > > > >> > >>>>>>> reasons as well, because this was a bit confusing in the > > > KIP. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S13ii. The group epoch is bumped immediately once a > warm-up > > > task > > > > >> > has > > > > >> > >>>>>>> caught up, since this is what triggers running the > assignor. > > > > >> > However, > > > > >> > >>>>>>> the assignment epoch is only bumped once the task > assignor > > > has > > > > >> run. > > > > >> > >>>> If > > > > >> > >>>>>>> the task assignment for a member hasn't changed, it will > > > > >> transition > > > > >> > >>>> to > > > > >> > >>>>>>> the new assignment epoch immediately. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S13iii. In the case of a rolling bounce, the group epoch > > > will be > > > > >> > >>>>>>> bumped for every member that leaves and joins the > group. For > > > > >> > updating > > > > >> > >>>>>>> rack ID, client tags, topology ID etc. -- this will > mostly > > > play > > > > >> a > > > > >> > >>>> role > > > > >> > >>>>>>> for static membership, as you mentioned, but would > cause the > > > > >> same > > > > >> > >>>>>>> thing - in a rolling bounce with static membership, we > will > > > > >> attempt > > > > >> > >>>> to > > > > >> > >>>>>>> recompute the assignment after every bounce, if the > > > topology ID > > > > >> is > > > > >> > >>>>>>> changed in the members. This can cause broker load, as > you > > > > >> > mentioned. > > > > >> > >>>>>>> Note, however, that we don't have to compute an > assignment > > > for > > > > >> > every > > > > >> > >>>>>>> group epoch bump. While KIP-848 computes its assignment > as > > > part > > > > >> of > > > > >> > >>>> the > > > > >> > >>>>>>> main loop of the group coordinator, it uses asynchronous > > > > >> assignment > > > > >> > >>>> in > > > > >> > >>>>>>> the case of client-side assignment, where the > assignment is > > > > >> slower. > > > > >> > >>>> So > > > > >> > >>>>>>> it will kick off the assignment on the client side, > enter > > > state > > > > >> > >>>>>>> `ASSIGNING`, but may have more group epoch bumps in the > > > meantime > > > > >> > >>>> while > > > > >> > >>>>>>> the assignment is computed on the client side. We leave > it > > > open > > > > >> to > > > > >> > >>>> use > > > > >> > >>>>>>> the same mechanism on the broker to compute assignment > > > > >> > asynchronously > > > > >> > >>>>>>> on a separate thread, in case streams assignment is too > > > slow for > > > > >> > the > > > > >> > >>>>>>> main loop of the group coordinator. We will also > consider > > > > >> > >>>>>>> rate-limiting the assignor to reduce broker load. > However, > > > it's > > > > >> too > > > > >> > >>>>>>> early to define this in detail, as we don't know much > about > > > the > > > > >> > >>>>>>> performance impact yet. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S13iv. The heartbeat just sends "null" instead of the > actual > > > > >> value > > > > >> > in > > > > >> > >>>>>>> case the value didn't change since the last heartbeat, > so no > > > > >> > >>>>>>> comparison is required. This is defined in the heartbeat > > > RPC and > > > > >> > the > > > > >> > >>>>>>> same as in KIP-848. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S13v. I think I disagree. The group epoch represents a > > > version > > > > >> of > > > > >> > the > > > > >> > >>>>>>> group state, including the current topology and the > > > metadata of > > > > >> all > > > > >> > >>>>>>> members -- the whole input to the task assignor -- and > > > defines > > > > >> > >>>> exactly > > > > >> > >>>>>>> the condition under which we attempt to recompute the > > > > >> assignment. > > > > >> > So > > > > >> > >>>>>>> it's not overloaded, and has a clear definition. It's > also > > > > >> exactly > > > > >> > >>>> the > > > > >> > >>>>>>> same as in KIP-848, and we do not want to move away too > far > > > from > > > > >> > that > > > > >> > >>>>>>> protocol, unless it's something that is specific to > streams > > > that > > > > >> > need > > > > >> > >>>>>>> to be changed. Also, the protocol already uses three > types > > > of > > > > >> > epochs > > > > >> > >>>> - > > > > >> > >>>>>>> member epoch, assignment epoch, group epoch. I don't > think > > > > >> adding > > > > >> > >>>> more > > > > >> > >>>>>>> epochs will make things clearer. Unless we have a strong > > > reason > > > > >> why > > > > >> > >>>> we > > > > >> > >>>>>>> need to add extra epochs, I'm not in favor. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S14. Assignment epoch is the same as in KIP-848: > > > > >> > >>>>>>> > > > > >> > >>>>>>> > > > > >> > >>>>>> > > > > >> > >>>> > > > > >> > > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment > > > > >> > >>>>>>> > > > > >> > >>>>>>> Its definition does not change at all, so we didn't > > > describe it > > > > >> in > > > > >> > >>>>>>> detail in KIP-1071. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S15. I agree that the name is a bit generic, but the > time to > > > > >> change > > > > >> > >>>>>>> this was when `group.instance.id` was introduced. > Also, if > > > we > > > > >> > start > > > > >> > >>>>>>> diverging from regular consumers / consumer groups in > terms > > > of > > > > >> > >>>> naming, > > > > >> > >>>>>>> it will not make things clearer. "application.id" and " > > > group.id > > > > >> " > > > > >> > are > > > > >> > >>>>>>> bad enough, in my eyes. I hope it's okay if I just > improve > > > the > > > > >> > field > > > > >> > >>>>>>> documentation. > > > > >> > >>>>>>> > > > > >> > >>>>>>> S16. This goes back to KIP-482. Essentially, in flexible > > > request > > > > >> > >>>>>>> schema versions you can define optional fields that > will be > > > > >> encoded > > > > >> > >>>>>>> with a field tag on the wire, but can also be completely > > > > >> omitted. > > > > >> > >>>>>>> This is similar to how fields are encoded in, e.g., > > > protobuf. It > > > > >> > >>>>>>> improves schema evolution, because we can add new > fields to > > > the > > > > >> > >>>> schema > > > > >> > >>>>>>> without bumping the version. There is also a minor > > > difference in > > > > >> > >>>>>>> encoding size: when encoded, tagged fields take up more > > > bytes on > > > > >> > the > > > > >> > >>>>>>> wire because of the field tag, but they can be omitted > > > > >> completely. > > > > >> > It > > > > >> > >>>>>>> will become interesting when we want to include extra > data > > > after > > > > >> > the > > > > >> > >>>>>>> initial protocol definition. > > > > >> > >>>>>>> > > > > >> > >>>>>>> Cheers, > > > > >> > >>>>>>> Lucas > > > > >> > >>>>>>> > > > > >> > >>>>>>> On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman > > > > >> > >>>>>>> <sop...@responsive.dev> wrote: > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> Ah, my bad -- I thought I refreshed the page to get the > > > latest > > > > >> > >>>> version > > > > >> > >>>>>>>> which is why I was a bit confused when I couldn't find > > > anything > > > > >> > >>>> about > > > > >> > >>>>>> the > > > > >> > >>>>>>>> new tools which I had previously seen in the KIP. Sorry > > > for the > > > > >> > >>>>>> confusion > > > > >> > >>>>>>>> and unnecessary questions > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S1. > > > > >> > >>>>>>>> > > > > >> > >>>>>>>>> You could imagine calling the initialize RPC > > > > >> > >>>>>>>>> explicitly (to implement explicit initialization), but > > > this > > > > >> would > > > > >> > >>>>>>>>> still mean sending an event to the background thread, > and > > > the > > > > >> > >>>>>>>>> background thread in turn invokes the RPC. However, > > > explicit > > > > >> > >>>>>>>>> initialization would require some additional public > > > interfaces > > > > >> > >>>> that we > > > > >> > >>>>>>>>> are not including in this KIP. > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> I'm confused -- if we do all the group management > stuff in > > > a > > > > >> > >>>> background > > > > >> > >>>>>>>> thread to avoid needing public APIs, how do we pass > all the > > > > >> > >>>>>>>> Streams-specific and app-specific info to the broker > for > > > the > > > > >> > >>>>>>>> StreamsGroupInitialize? I am guessing this is where we > > > need to > > > > >> > >>>> invoke > > > > >> > >>>>>>>> internal APIs and is related to our discussion about > > > removing > > > > >> the > > > > >> > >>>>>>>> KafkaClientSupplier. However, it seems to me that no > > > matter how > > > > >> > you > > > > >> > >>>>>> look at > > > > >> > >>>>>>>> it, Kafka Streams will need to pass information to and > > > from the > > > > >> > >>>>>> background > > > > >> > >>>>>>>> thread if the background thread is to be aware of > things > > > like > > > > >> > tasks, > > > > >> > >>>>>> offset > > > > >> > >>>>>>>> sums, repartition topics, etc > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> We don't really need to rehash everything here since it > > > seems > > > > >> like > > > > >> > >>>> the > > > > >> > >>>>>>>> proposal to add a StreamsConsumer/StreamsAssignment > > > interface > > > > >> will > > > > >> > >>>>>> address > > > > >> > >>>>>>>> this, which I believe we're in agreement on. I just > wanted > > > to > > > > >> > point > > > > >> > >>>> out > > > > >> > >>>>>>>> that this work to replace the KafkaClientSupplier is > > > clearly > > > > >> > >>>> intimately > > > > >> > >>>>>>>> tied up with KIP-1071, and should imho be part of this > KIP > > > and > > > > >> not > > > > >> > >>>> its > > > > >> > >>>>>> own > > > > >> > >>>>>>>> standalone thing. > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> By the way I'm happy to hop on a call to help move > things > > > > >> forward > > > > >> > on > > > > >> > >>>>>> that > > > > >> > >>>>>>>> front (or anything really). Although I guess we're just > > > waiting > > > > >> > on a > > > > >> > >>>>>> design > > > > >> > >>>>>>>> right now? > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S2. I was suggesting something lower for the default > upper > > > > >> limit > > > > >> > on > > > > >> > >>>> all > > > > >> > >>>>>>>> groups. I don't feel too strongly about it, just > wanted to > > > > >> point > > > > >> > out > > > > >> > >>>>>> that > > > > >> > >>>>>>>> the tradeoff is not about the assignor runtime but > rather > > > about > > > > >> > >>>> resource > > > > >> > >>>>>>>> consumption, and that too many warmups could put undue > > > load on > > > > >> the > > > > >> > >>>>>> cluster. > > > > >> > >>>>>>>> In the end if we want to trust application operators > then > > > I'd > > > > >> say > > > > >> > >>>> it's > > > > >> > >>>>>> fine > > > > >> > >>>>>>>> to use a higher cluster-wide max like 100. > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S8-10 I'll get back to you on this in another followup > > > since > > > > >> I'm > > > > >> > >>>> still > > > > >> > >>>>>>>> thinking some things through and want to keep the > > > discussion > > > > >> > >>>> rolling for > > > > >> > >>>>>>>> now. In the meantime I have some additional questions: > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S11. One of the main issues with unstable assignments > > > today is > > > > >> the > > > > >> > >>>> fact > > > > >> > >>>>>>>> that assignors rely on deterministic assignment and > use the > > > > >> > process > > > > >> > >>>> Id, > > > > >> > >>>>>>>> which is not configurable and only persisted via local > > > disk in > > > > >> a > > > > >> > >>>>>>>> best-effort attempt. It would be a very small change to > > > include > > > > >> > >>>> this in > > > > >> > >>>>>>>> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to > do > > > the > > > > >> PR > > > > >> > for > > > > >> > >>>>>> this > > > > >> > >>>>>>>> myself so as not to add to your load). There's a ticket > > > for it > > > > >> > here: > > > > >> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-15190 > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S12. What exactly is the member epoch and how is it > > > defined? > > > > >> When > > > > >> > >>>> is it > > > > >> > >>>>>>>> bumped? I see in the HB request field that certain > values > > > > >> signal a > > > > >> > >>>>>> member > > > > >> > >>>>>>>> joining/leaving, but I take it there are valid positive > > > values > > > > >> > >>>> besides > > > > >> > >>>>>> the > > > > >> > >>>>>>>> 0/-1/-2 codes? More on this in the next question: > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S13. The KIP says the group epoch is updated in these > three > > > > >> cases: > > > > >> > >>>>>>>> a. Every time the topology is updated through the > > > > >> > >>>>>> StreamsGroupInitialize API > > > > >> > >>>>>>>> b. When a member with an assigned warm-up task reports > a > > > task > > > > >> > >>>> changelog > > > > >> > >>>>>>>> offset and task changelog end offset whose difference > is > > > less > > > > >> that > > > > >> > >>>>>>>> acceptable.recovery.lag. > > > > >> > >>>>>>>> c. When a member updates its topology ID, rack ID, > client > > > tags > > > > >> or > > > > >> > >>>>>> process > > > > >> > >>>>>>>> ID. Note: Typically, these do not change within the > > > lifetime > > > > >> of a > > > > >> > >>>>>> Streams > > > > >> > >>>>>>>> client, so this only happens when a member with static > > > > >> membership > > > > >> > >>>>>> rejoins > > > > >> > >>>>>>>> with an updated configuration. > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S13.i First, just to clarify, these are not the *only* > > > times > > > > >> the > > > > >> > >>>> group > > > > >> > >>>>>>>> epoch is bumped but rather the additional cases on top > of > > > the > > > > >> > >>>> regular > > > > >> > >>>>>>>> consumer group protocol -- so it's still bumped when a > new > > > > >> member > > > > >> > >>>> joins > > > > >> > >>>>>> or > > > > >> > >>>>>>>> leaves, etc, right? > > > > >> > >>>>>>>> S13.ii Second, does (b) imply the epoch is bumped just > > > whenever > > > > >> > the > > > > >> > >>>>>> broker > > > > >> > >>>>>>>> notices a task has finished warming up, or does it > first > > > > >> reassign > > > > >> > >>>> the > > > > >> > >>>>>> task > > > > >> > >>>>>>>> and then bump the group epoch as a result of this task > > > > >> > reassignment? > > > > >> > >>>>>>>> S13.iii Third, (a) mentions the group epoch is bumped > on > > > each > > > > >> > >>>>>>>> StreamsGroupInitialize API but (c) says it gets bumped > > > > >> whenever a > > > > >> > >>>> member > > > > >> > >>>>>>>> updates its topology ID. Does this mean that in the > event > > > of a > > > > >> > >>>> rolling > > > > >> > >>>>>>>> upgrade that changes the topology, the group epoch is > > > bumped > > > > >> just > > > > >> > >>>> once > > > > >> > >>>>>> or > > > > >> > >>>>>>>> for each member that updates its topology ID? Or does > the > > > > >> > "updating > > > > >> > >>>> its > > > > >> > >>>>>>>> topology ID" somehow have something to do with static > > > > >> membership? > > > > >> > >>>>>>>> S13.iv How does the broker know when a member has > changed > > > its > > > > >> rack > > > > >> > >>>> ID, > > > > >> > >>>>>>>> client tags, etc -- does it compare these values on > every > > > hb?? > > > > >> > That > > > > >> > >>>>>> seems > > > > >> > >>>>>>>> like a lot of broker load. If this is all strictly for > > > static > > > > >> > >>>> membership > > > > >> > >>>>>>>> then have we considered leaving static membership > support > > > for a > > > > >> > >>>> followup > > > > >> > >>>>>>>> KIP? Not necessarily advocating for that, just > wondering if > > > > >> this > > > > >> > was > > > > >> > >>>>>>>> thought about already. Imo it would be acceptable to > not > > > > >> support > > > > >> > >>>> static > > > > >> > >>>>>>>> membership in the first version (especially if we fixed > > > other > > > > >> > issues > > > > >> > >>>>>> like > > > > >> > >>>>>>>> making the process Id configurable to get actual stable > > > > >> > >>>> assignments!) > > > > >> > >>>>>>>> S13.v I'm finding the concept of group epoch here to be > > > > >> somewhat > > > > >> > >>>>>>>> overloaded. It sounds like it's trying to keep track of > > > > >> multiple > > > > >> > >>>> things > > > > >> > >>>>>>>> within a single version: the group membership, the > topology > > > > >> > >>>> version, the > > > > >> > >>>>>>>> assignment version, and various member statuses (eg > rack > > > > >> ID/client > > > > >> > >>>>>> tags). > > > > >> > >>>>>>>> Personally, I think it would be extremely valuable to > > > unroll > > > > >> all > > > > >> > of > > > > >> > >>>> this > > > > >> > >>>>>>>> into separate epochs with clear definitions and > boundaries > > > and > > > > >> > >>>> triggers. > > > > >> > >>>>>>>> For example, I would suggest something like this: > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> group epoch: describes the group at a point in time, > bumped > > > > >> when > > > > >> > >>>> set of > > > > >> > >>>>>>>> member ids changes (ie static or dynamic member leaves > or > > > > >> joins) > > > > >> > -- > > > > >> > >>>>>>>> triggered by group coordinator noticing a change in > group > > > > >> > membership > > > > >> > >>>>>>>> topology epoch: describes the topology version, bumped > for > > > each > > > > >> > >>>>>> successful > > > > >> > >>>>>>>> StreamsGroupInitialize that changes the broker's > topology > > > ID -- > > > > >> > >>>>>> triggered > > > > >> > >>>>>>>> by group coordinator bumping the topology ID (ie > > > > >> > >>>> StreamsGroupInitialize) > > > > >> > >>>>>>>> (optional) member epoch: describes the member version, > > > bumped > > > > >> > when a > > > > >> > >>>>>>>> memberId changes its rack ID, client tags, or process > ID > > > (maybe > > > > >> > >>>> topology > > > > >> > >>>>>>>> ID, not sure) -- is this needed only for static > membership? > > > > >> Going > > > > >> > >>>> back > > > > >> > >>>>>> to > > > > >> > >>>>>>>> question S12, when is the member epoch is bumped in the > > > current > > > > >> > >>>>>> proposal? > > > > >> > >>>>>>>> assignment epoch: describes the assignment version, > bumped > > > when > > > > >> > the > > > > >> > >>>>>>>> assignor runs successfully (even if the actual > assignment > > > of > > > > >> tasks > > > > >> > >>>> does > > > > >> > >>>>>> not > > > > >> > >>>>>>>> change) -- can be triggered by tasks completing > warmup, a > > > > >> manual > > > > >> > >>>>>>>> client-side trigger (future KIP only), etc (by > definition > > > is > > > > >> > bumped > > > > >> > >>>> any > > > > >> > >>>>>>>> time any of the other epochs are bumped since they all > > > trigger > > > > >> a > > > > >> > >>>>>>>> reassignment) > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S14. The KIP also mentions a new concept of an > "assignment > > > > >> epoch" > > > > >> > >>>> but > > > > >> > >>>>>>>> doesn't seem to ever elaborate on how this is defined > and > > > what > > > > >> > it's > > > > >> > >>>> used > > > > >> > >>>>>>>> for. I assume it's bumped each time the assignment > changes > > > and > > > > >> > >>>>>> represents a > > > > >> > >>>>>>>> sort of "assignment version", is that right? Can you > > > describe > > > > >> in > > > > >> > >>>> more > > > > >> > >>>>>>>> detail the difference between the group epoch and the > > > > >> assignment > > > > >> > >>>> epoch? > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S15. I assume the "InstanceId" field in the HB request > > > > >> corresponds > > > > >> > >>>> to > > > > >> > >>>>>> the > > > > >> > >>>>>>>> group.instance.id config of static membership. I > always > > > felt > > > > >> this > > > > >> > >>>> field > > > > >> > >>>>>>>> name was too generic and easy to get mixed up with > other > > > > >> > >>>> identifiers, > > > > >> > >>>>>> WDYT > > > > >> > >>>>>>>> about "StaticId" or "StaticInstanceId" to make the > static > > > > >> > membership > > > > >> > >>>>>>>> relation more explicit? > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> S16. what is the "flexibleVersions" field in the RPC > > > > >> protocols? I > > > > >> > >>>> assume > > > > >> > >>>>>>>> this is related to versioning compatibility but it's > not > > > quite > > > > >> > clear > > > > >> > >>>>>> from > > > > >> > >>>>>>>> the KIP how this is to be used. For example if we > modify > > > the > > > > >> > >>>> protocol by > > > > >> > >>>>>>>> adding new fields at the end, we'd bump the version but > > > should > > > > >> the > > > > >> > >>>>>>>> flexibleVersions field change as well? > > > > >> > >>>>>>>> > > > > >> > >>>>>>>> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy > > > > >> > >>>>>>>> <lbruts...@confluent.io.invalid> wrote: > > > > >> > >>>>>>>> > > > > >> > >>>>>>>>> Hi Sophie, > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> Thanks for your detailed comments - much appreciated! > I > > > think > > > > >> you > > > > >> > >>>> read > > > > >> > >>>>>>>>> a version of the KIP that did not yet include the > admin > > > > >> > >>>> command-line > > > > >> > >>>>>>>>> tool and the Admin API extensions, so some of the > > > comments are > > > > >> > >>>> already > > > > >> > >>>>>>>>> addressed in the KIP. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize > are > > > > >> called > > > > >> > in > > > > >> > >>>> the > > > > >> > >>>>>>>>> consumer background thread. Note that in the new > consumer > > > > >> > threading > > > > >> > >>>>>>>>> model, all RPCs are run by the background thread. > Check > > > out > > > > >> this: > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> > > > > >> > >>>>>> > > > > >> > >>>> > > > > >> > > > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > > > > >> > >>>>>>>>> for more information. Both our RPCs are just part of > the > > > group > > > > >> > >>>>>>>>> membership management and do not need to be invoked > > > > >> explicitly by > > > > >> > >>>> the > > > > >> > >>>>>>>>> application thread. You could imagine calling the > > > initialize > > > > >> RPC > > > > >> > >>>>>>>>> explicitly (to implement explicit initialization), but > > > this > > > > >> would > > > > >> > >>>>>>>>> still mean sending an event to the background thread, > and > > > the > > > > >> > >>>>>>>>> background thread in turn invokes the RPC. However, > > > explicit > > > > >> > >>>>>>>>> initialization would require some additional public > > > interfaces > > > > >> > >>>> that we > > > > >> > >>>>>>>>> are not including in this KIP. StreamsGroupDescribe is > > > called > > > > >> by > > > > >> > >>>> the > > > > >> > >>>>>>>>> AdminClient, and used by the command-line tool > > > > >> > >>>>>>>>> kafka-streams-groups.sh. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S2. I think the max.warmup.replicas=100 suggested by > Nick > > > was > > > > >> > >>>> intended > > > > >> > >>>>>>>>> as the upper limit for setting the group > configuration on > > > the > > > > >> > >>>> broker. > > > > >> > >>>>>>>>> Just to make sure that this was not a > misunderstanding. By > > > > >> > default, > > > > >> > >>>>>>>>> values above 100 should be rejected when setting a > > > specific > > > > >> value > > > > >> > >>>> for > > > > >> > >>>>>>>>> group. Are you suggesting 20 or 30 for the default > value > > > for > > > > >> > >>>> groups, > > > > >> > >>>>>>>>> or the default upper limit for the group > configuration? > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S3. Yes, it's supposed to be used like > > > SHUTDOWN_APPLICATION. > > > > >> The > > > > >> > >>>>>>>>> MemberEpoch=-1 is a left-over from an earlier > discussion. > > > It > > > > >> > means > > > > >> > >>>>>>>>> that the member is leaving the group, so the > intention was > > > > >> that > > > > >> > the > > > > >> > >>>>>>>>> member must leave the group when it asks the other > > > members to > > > > >> > shut > > > > >> > >>>>>>>>> down. We later reconsidered this and decided that all > > > > >> > applications > > > > >> > >>>>>>>>> should just react to the shutdown application signal > that > > > is > > > > >> > >>>> returned > > > > >> > >>>>>>>>> by the broker, so the client first sets the > > > > >> ShutdownApplication > > > > >> > and > > > > >> > >>>>>>>>> later leaves the group. Thanks for spotting this, I > > > removed > > > > >> it. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S4. Not sure if this refers to the latest version of > the > > > KIP. > > > > >> We > > > > >> > >>>> added > > > > >> > >>>>>>>>> an extension of the admin API and a > > > kafka-streams-groups.sh > > > > >> > >>>>>>>>> command-line tool to the KIP already. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S5. All RPCs for dealing with offsets will keep > working > > > with > > > > >> > >>>> streams > > > > >> > >>>>>>>>> groups. The extension of the admin API is rather > cosmetic, > > > > >> since > > > > >> > >>>> the > > > > >> > >>>>>>>>> method names use "consumer group". The RPCs, however, > are > > > > >> generic > > > > >> > >>>> and > > > > >> > >>>>>>>>> do not need to be changed. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S6. Yes, you can use the DeleteGroup RPC with any > group > > > ID, > > > > >> > whether > > > > >> > >>>>>>>>> streams group or not. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S7. See the admin API section. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S8. I guess for both A and B, I am not sure what you > are > > > > >> > >>>> suggesting. > > > > >> > >>>>>>>>> Do you want to make the broker-side topology immutable > > > and not > > > > >> > >>>> include > > > > >> > >>>>>>>>> any information about the topology, like the topology > ID > > > in > > > > >> the > > > > >> > >>>> RPC? > > > > >> > >>>>>>>>> It would seem that this would be a massive food-gun > for > > > > >> people, > > > > >> > if > > > > >> > >>>>>>>>> they start changing their topology and don't notice > that > > > the > > > > >> > >>>> broker is > > > > >> > >>>>>>>>> looking at a completely different version of the > > > topology. Or > > > > >> did > > > > >> > >>>> you > > > > >> > >>>>>>>>> mean that there is some kind of topology ID, so that > at > > > least > > > > >> we > > > > >> > >>>> can > > > > >> > >>>>>>>>> detect inconsistencies between broker and client-side > > > > >> topologies, > > > > >> > >>>> and > > > > >> > >>>>>>>>> we fence out any member with an incorrect topology ID? > > > Then we > > > > >> > >>>> seem to > > > > >> > >>>>>>>>> end up with mostly the same RPCs and the same > questions > > > (how > > > > >> is > > > > >> > the > > > > >> > >>>>>>>>> topology ID generated?). I agree that the latter > could be > > > an > > > > >> > >>>> option. > > > > >> > >>>>>>>>> See summary at the end of this message. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S9. If we flat out refuse topology updates, agree - we > > > cannot > > > > >> let > > > > >> > >>>> the > > > > >> > >>>>>>>>> application crash on minor changes to the topology. > > > However, > > > > >> if > > > > >> > we > > > > >> > >>>>>>>>> allow topology updates as described in the KIP, there > are > > > only > > > > >> > >>>> upsides > > > > >> > >>>>>>>>> to making the topology ID more sensitive. All it will > > > cause is > > > > >> > >>>> that a > > > > >> > >>>>>>>>> client will have to resend a > `StreamsGroupInitialize`, and > > > > >> during > > > > >> > >>>> the > > > > >> > >>>>>>>>> rolling bounce, older clients will not get tasks > assigned. > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> S10. The intention in the KIP is really just this - > old > > > > >> clients > > > > >> > can > > > > >> > >>>>>>>>> only retain tasks, but not get new ones. If the > topology > > > has > > > > >> > >>>>>>>>> sub-topologies, these will initially be assigned to > the > > > new > > > > >> > >>>> clients, > > > > >> > >>>>>>>>> which also have the new topology. You are right that > > > rolling > > > > >> an > > > > >> > >>>>>>>>> application where the old structure and the new > structure > > > are > > > > >> > >>>>>>>>> incompatible (e.g. different subtopologies access the > same > > > > >> > >>>> partition) > > > > >> > >>>>>>>>> will cause problems. But this will also cause > problems in > > > the > > > > >> > >>>> current > > > > >> > >>>>>>>>> protocol, so I'm not sure, if it's strictly a > regression, > > > it's > > > > >> > just > > > > >> > >>>>>>>>> unsupported (which we can only make the best effort to > > > > >> detect). > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> In summary, for the topology ID discussion, I mainly > see > > > two > > > > >> > >>>> options: > > > > >> > >>>>>>>>> 1) stick with the current KIP proposal > > > > >> > >>>>>>>>> 2) define the topology ID as the hash of the > > > > >> > StreamsGroupInitialize > > > > >> > >>>>>>>>> topology metadata only, as you suggested, and fence > out > > > > >> members > > > > >> > >>>> with > > > > >> > >>>>>>>>> an incompatible topology ID. > > > > >> > >>>>>>>>> I think doing 2 is also a good option, but in the end > it > > > comes > > > > >> > >>>> down to > > > > >> > >>>>>>>>> how important we believe topology updates (where the > set > > > of > > > > >> > >>>>>>>>> sub-topologies or set of internal topics are > affected) to > > > be. > > > > >> The > > > > >> > >>>> main > > > > >> > >>>>>>>>> goal is to make the new protocol a drop-in replacement > > > for the > > > > >> > old > > > > >> > >>>>>>>>> protocol, and at least define the RPCs in a way that > we > > > won't > > > > >> > need > > > > >> > >>>>>>>>> another KIP which bumps the RPC version to make the > > > protocol > > > > >> > >>>>>>>>> production-ready. Adding more command-line tools, or > > > public > > > > >> > >>>> interfaces > > > > >> > >>>>>>>>> seems less critical as it doesn't change the protocol > and > > > can > > > > >> be > > > > >> > >>>> done > > > > >> > >>>>>>>>> easily later on. The main question becomes - is the > > > protocol > > > > >> > >>>>>>>>> production-ready and sufficient to replace the classic > > > > >> protocol > > > > >> > if > > > > >> > >>>> we > > > > >> > >>>>>>>>> go with 2? > > > > >> > >>>>>>>>> > > > > >> > >>>>>>>>> Cheers, > > > > >> > >>>>>>>>> Lucas > > > > >> > >>>>>>>>> > > > > >> > >>>>>> > > > > >> > >>>> > > > > >> > >>>> > > > > >> > >>> > > > > >> > >> > > > > >> > > > > > >> > > > > > >> > > > > > > > > > >