Hi Andrew, thanks for the comments!
You are right, we haven't fully caught up with the changed that were made in the final version of KIP-1043. I agree with all your points, and have made the corresponding changes in the KIP. Cheers, Lucas On Tue, Dec 3, 2024 at 6:51 PM Andrew Schofield <andrew_schofield_j...@outlook.com> wrote: > > Hi Bruno/Lucas, > A few comments related to consistency with the other KIPs. > As we now have multiple types of group, we need to work > out how to handle listing, describing and so on in a consistent way. > > AS14: KIP-1043 introduced Admin#listGroups and this is intended as the > way to list groups of all types. As a result, the specific methods and > classes which applied to just one type of group have been removed. > > Admin#listShareGroups does not exist (any more) and instead > Admin#listGroups with a filter on group type can be used instead. > There is no ShareGroupListing, ListShareGroupsOptions or > ListShareGroupsResult either. > > All of the equivalent stuff for consumer groups are being deprecated > and removed by KIP-1043 (probably in AK 4.1). > > I suggest that you remove Admin#listStreamsGroups also. It is not > necessary because Admin#listGroups can do it. > > AS15: Similarly, there is no ShareGroupState any longer, just > GroupState. A streams group has a specific state, NOT_READY. > I recommend that you add this state to GroupState, and > use GroupState instead of having StreamsGroupState. > > During the development of KIP-1043, it became clear that > having enumerations for all of the group types separately > was cumbersome and it prevented a state-based filter on > Admin#listGroups. > > AS16: The o.a,k.common.GroupType for streams should be STREAMS > not Streams. > > AS17: Please add streams groups to bin/kafka-groups.sh. > > Thanks, > Andrew > ________________________________________ > From: Sophie Blee-Goldman <sop...@responsive.dev> > Sent: 20 November 2024 08:15 > To: dev@kafka.apache.org <dev@kafka.apache.org> > Subject: Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol > > Thanks Lucas! That all sounds good to me. > > I think I officially have nothing left to say or ask about this KIP, so > once you've > updated the doc with what we've discussed in the past few messages then > I'm personally feeling ready to vote on it. > > On Tue, Nov 19, 2024 at 11:10 PM Lucas Brutschy > <lbruts...@confluent.io.invalid> wrote: > > > Hi Sophie, > > > > S1. Yes, we'll include a list of missing topics or topic > > misconfigurations in the heartbeat response (StatusDetail). Of course, > > we will not only expose this via the streams group state metric which > > can be monitored, we will log the state of the group and the status > > detail on the client, so it should be pretty straight forward for new > > users to figure out the problem. > > > > As for the streams group state listener - the streams group state is > > sent with the heartbeat response, so it's not directly possible to > > implement it. Maybe it would make more sense to define the listener on > > the `status` code sent in the heartbeat response, which includes > > members-specific states (like STALE_TOPOLOGY). But yes, let's do this > > in a separate KIP. I'll create a ticket for it (once we have accepted > > this KIP and start creating tickets for KIP-1071). > > > > S2. Yes, the config `topology.epoch` would go to StreamsConfig. > > StreamsGroupDescribe RPC would return the current epoch. As Bruno > > suggested, the default of the `topology.epoch` would be 0 on the > > client and the epoch would start at -1 on the broker (or 1 and 0, if > > people prefer). So to first initialize a group, no config change would > > be required. People who reset their application or change the > > application ID upon redeploys would never have to touch the config. > > > > I think we don't need the topology ID to validate that the user did > > not change the topology without bumping the epoch. We keep the > > topology and topology epoch immutable during a client's lifetime, so > > it's enough that we validate this once when joining. When the client > > joins, we can check whether the topology_epoch & topology match the > > broker-side. All following heartbeats don't include the topology > > anymore, but we do not need to "reverify" that the topology epoch & > > topology are consistent, it is enough to compare epochs. > > > > Cheers, > > Lucas > > > > On Wed, Nov 20, 2024 at 4:56 AM Sophie Blee-Goldman > > <sop...@responsive.dev> wrote: > > > > > > Thanks Lucas! A few more questions to nail down the details but on the > > > whole I think this is a good plan > > > > > > S1. I definitely agree that the current behavior is inconsistent and not > > > ideal for missing topics. So I'm fine with changing this, just wanted to > > > make sure it was intentional. That said I do think it would be a good > > > exercise to think through the user experience here and what it will look > > > like in practice for someone to debug why their app is not doing anything > > > due to missing source topics. Especially because in my experience this > > > tends to happen most with relatively new users who are just trying out > > > Kafka Streams eg via the quickstart, or running and debugging in their > > test > > > environment, etc. Asking them to monitor the group state might be a lot > > for > > > some folks, especially if that has to be done via a separate cli they > > > probably don't even know about. But I do think you're right that on the > > > whole, the application should stay up and running and not immediately die > > > if topics are missing. It sounds like we do plan to log a warning when > > this > > > happens as detected by the heartbeat, which helps. Would be even better > > if > > > we could return a list of the missing topic names in the heartbeat and > > > include that in the logs. > > > > > > Beyond that, what if we introduced a new kind of listener similar to the > > > KafkaStreams.State listener, but instead of listening in on the state of > > > the client it listens on the state of the Streams group? This could be > > done > > > in a followup KIP of course, just pointing out that we can improve on > > this > > > in the future without modifying anything in the current KIP. I'd prefer > > > this over introducing a timeout for missing topics,, personally. (If you > > > don't want to include it in the current KIP let's just file a ticket for > > it > > > perhaps?) > > > > > > S2. Ok just to hammer out the details, we're proposing that the topology > > > epoch be configurable via a new StreamsConfig, is that right? And that > > > there will be an Admin API that can be used to fetch the current topology > > > epoch for the group? > > > > > > How will this work if a user doesn't set the topology.epoch config? Do we > > > just initialize it to 0 and assume that the topology will always be the > > > same until/unless the user manually bumps it by the StreamsConfig? Would > > we > > > maybe want to keep the topology ID around so that we can verify the > > > topology ID remains the same and matches for a given topology epoch? Or > > > should we just leave it up to users to not screw with their topology > > > without bumping the topology epoch? To be fair, that's more or less what > > we > > > do now, but it feels like if we introduce a concept of topology version > > > then we should try to enforce it somehow. I'm not sure, what are your > > > thoughts here? > > > > > > Or does each client send the entire topology in each heartbeat? That kind > > > of seems like a lot of data to be passing around that often. We could > > > replace the member.topology with member.topology_ID for purposes of > > > verification (eg detecting the STREAMS_INVALID_TOPOLOGY_EPOCH case) and > > get > > > verification on every heartbeat without all the extra bytes. Probably not > > > an issue most of the time but people can run some truly huge topologies. > > > > > > On Tue, Nov 19, 2024 at 2:47 AM Lucas Brutschy > > > <lbruts...@confluent.io.invalid> wrote: > > > > > > > Hi Sophie, > > > > > > > > S1. You are reading it correctly. We added NOT_READY to bridge the > > > > time when internal topics are being created. Right now, the > > > > StreamsPartitioner just blocks until the internal topics are created, > > > > but we don't want to block in the broker. We want the client to be > > > > able to join while the internal topics may not be created, yet. > > > > > > > > We then went on to simplify the story about topic / topology > > > > mismatches to treat all such errors the same way - we enter NOT_READY, > > > > but keep the application running, and indicate the problem in the > > > > heartbeats. The idea is that the user just monitors the status of the > > > > group, which is exposed as a metric. > > > > > > > > We can try to make the behavior more like the StreamsPartitioner, but > > > > its behavior is quite inconsistent. For example, missing source topics > > > > are handled via an error code which shut down the application, but > > > > problems when creating the internal topics, or topics not being > > > > copartitioned, we just let TopologyException/TimeoutException fall > > > > through (which I believe fails the group leader only). > > > > > > > > We could consider shutting down the application upon missing / > > > > mispartitioned internal or source topics. I wonder if the simplest way > > > > to do that would be to introduce a timeout on the client side. I think > > > > the internal topic manager right now uses something like > > > > max.poll.interval*2. But I'm not sure if shutting down the application > > > > really simplifies things. > > > > > > > > S2. Let me try to add some details to the Bruno's proposal. Manually, > > > > to upgrade a topology, the user would just deploy a new version of the > > > > application, which has the new topology and a configuration change (in > > > > the code or wherever) and deploy it. If the user doesn't know the > > > > current topology epoch, they could fetch it using the streams groups > > > > command-line tool, or potentially a UI. Automating in CD would work > > > > the same way - determine the current topology epoch using admin API or > > > > the command-line tool, set the config (by changing a file, or > > > > launchdarkly or whatever your setup is) and deploy. It's definitely > > > > not completely trivial, but it is an advanced feature, after all. I > > > > made a rough sketch of error handling for topology epochs, I'll put > > > > this in the "appendix" below since it's quite detailed. > > > > > > > > S3. If we are going for topology epochs, which is a nice proposal, I > > > > am not sure if we actually need to have the topology ID anymore - > > > > since we can store the topology epoch that each member is currently > > > > at, and we cannot change the topology without bumping the epoch, the > > > > point of the ID (detecting that a member is working with a stale > > > > topology) can be replaced by comparing topology epochs. So if we go > > > > down this route, I’d suggest removing topology IDs. We could introduce > > > > them for naming purposes, but they are not required by the protocol, > > > > so I'd rather do that later, if ever. > > > > > > > > Cheers, Lucas > > > > > > > > ## Appendix: error handling for topology epochs. > > > > > > > > We’ll require some new/changed error codes / statuses to handle the > > > > updates correctly. > > > > > > > > * Non-fatal status INCONSISTENT_TOPOLOGY is renamed to > > > > STALE_TOPOLOGY, since we now have something like a topology > > > > chronology. It is returned for members who are already part of the > > > > group, but at an old topology epoch > > > > * Fatal error STREAMS_INVALID_TOPOLOGY_EPOCH if, for example, the > > > > topology was changed by the epoch not bumped. > > > > * Fatal error STREAMS_TOPOLOGY_FENCED if we are trying to join with > > > > an outdated topology. > > > > > > > > So then all possible cases should be handled like this: > > > > > > > > * When a member joins (heartbeat.member_epoch = 0) > > > > ** Require both heartbeat.topology and heartbeat.topology_epoch to be > > set. > > > > ** If heartbeat.topology_epoch = group.topology_epoch + 1, the member > > > > joins and updates topology ID/metadata/epoch for the group. > > > > ** If heartbeat.topology_epoch = group.topology_epoch and > > > > heartbeat.topology = group.topology, the member joins without updating > > > > the topology ID/metadata/epoch for the group. > > > > ** If heartbeat.topology_epoch = group.topology_epoch but > > > > heartbeat.topology != group.topology, we will fail with error > > > > STREAMS_INVALID_TOPOLOGY_EPOCH. > > > > ** If heartbeat.topology_epoch > group.topology_epoch + 1, we fail > > > > with new error STREAMS_INVALID_TOPOLOGY_EPOCH > > > > ** If heartbeat.topology_epoch < group.topology_epoch, we fail with > > > > STREAMS_TOPOLOGY_FENCED > > > > * Otherwise (if heartbeat.member_epoch != 0) > > > > ** Require that none of heartbeat.topology, heartbeat.topology_epoch > > > > are set in the request, and let member.topology_epoch be the topology > > > > epoch stored in the member metadata on the broker for that member. > > > > * If member.topology_epoch < group.topology_epoch, the heartbeat is > > > > successful, but we set the state STALE_TOPOLOGY. > > > > * If member.topology_epoch > group.topology_epoch, should be > > impossible. > > > > * If member.topology_epoch = group.topology_epoch, the heartbeat is > > > > successful. > > > > > > > > Common cases that could happen in practice (each of them should have a > > > > different status / error code to be distinguishable): > > > > > > > > * When a user rolls the application with a new topology with a > > > > different topology, but without bumping the topology epoch, the roll > > > > of the first client would directly fail with > > > > STREAMS_INVALID_TOPOLOGY_EPOCH, which would be treated as fatal and > > > > cause the first client to not come up - which cause the user to > > > > correctly detect the error. > > > > * When a user rolls the application with a new topology epoch / > > topology > > > > ** The first client being rolled will bump the topology epoch > > > > ** All other clients will see that they are using an outdated > > > > topology by receiving a non-fatal STALE_TOPOLOGY error > > > > ** If any of the stale clients have to rejoin the group before being > > > > updated to the new topology, they will fail fatally with > > > > STREAMS_TOPOLOGY_FENCED. > > > > > > > > > > > > On Mon, Nov 18, 2024 at 12:53 PM Nick Telford <nick.telf...@gmail.com> > > > > wrote: > > > > > > > > > > Actually, scratch that. On reflection I think I prefer Bruno's > > original > > > > > idea to specify it in the configuration. > > > > > > > > > > Cheers, > > > > > Nick > > > > > > > > > > On Sat, 16 Nov 2024 at 17:59, Nick Telford <nick.telf...@gmail.com> > > > > wrote: > > > > > > > > > > > Hey everyone, > > > > > > > > > > > > With respect to Bruno's proposal, could instances cache their > > topology > > > > > > epoch on disk, and then upgrades/downgrades would simply involve > > > > deleting > > > > > > the cached epoch before starting the instance? > > > > > > > > > > > > My thinking is that this might be potentially simpler for users > > than > > > > > > modifying configuration. > > > > > > > > > > > > Cheers, > > > > > > Nick > > > > > > > > > > > > On Sat, 16 Nov 2024, 00:41 Sophie Blee-Goldman, < > > sop...@responsive.dev > > > > > > > > > > > wrote: > > > > > > > > > > > >> Thanks for the updates! Few minor questions before we wrap this > > up and > > > > > >> move > > > > > >> to a vote: > > > > > >> > > > > > >> S1. Can you clarify the outward-facing behavior of a group that > > > > enters the > > > > > >> NOT_READY state due to, say, missing source topics? It sounds > > like the > > > > > >> application will continue to run without processing anything, > > which > > > > would > > > > > >> be a change compared to today where we shut down with an explicit > > > > error. > > > > > >> Am > > > > > >> I reading this correctly and is this change intentional? > > > > > >> > > > > > >> S2. Regarding topology upgrades, I am fully in favor of Bruno's > > > > proposal. > > > > > >> Just feel like we need to flesh it out a bit. For example you > > say "no > > > > > >> specific tooling" is needed, but how would a user go about > > upgrading > > > > the > > > > > >> topology? If we can just describe this process in a bit more > > detail > > > > then I > > > > > >> would feel good. > > > > > >> > > > > > >> S3. Lastly, just a quick reminder, we should make sure to clarify > > in > > > > the > > > > > >> KIP how the topology ID will be computed since that impacts which > > > > kind of > > > > > >> upgrades are possible and is therefore part of the public API. > > IIUC > > > > the > > > > > >> plan is to compute it from the topics which makes sense to me > > (though > > > > I do > > > > > >> think it should eventually be pluggable, perhaps in a followup > > KIP) > > > > > >> > > > > > >> On Thu, Nov 7, 2024 at 10:08 AM Bruno Cadonna <cado...@apache.org > > > > > > > wrote: > > > > > >> > > > > > >> > Hi Lucas and all others, > > > > > >> > > > > > > >> > Thanks for the proposals regarding the topology upgrades! > > > > > >> > > > > > > >> > I have an additional proposal: > > > > > >> > > > > > > >> > We introduce a topology epoch alongside the topology ID. The > > > > topology > > > > > >> > epoch is set to 0 on the group coordinator for a new group. The > > > > topology > > > > > >> > epoch is also a config within the Streams client, the default > > is 1. > > > > > >> > > > > > > >> > When the Streams client starts up, it sends its topology epoch > > > > alongside > > > > > >> > the topology ID and the topology to the group coordinator. The > > group > > > > > >> > coordinator initializes the topology with the given topology ID > > and > > > > > >> > increases the topology epoch to 1. > > > > > >> > > > > > > >> > When the client wants to upgrade a topology it needs to send a > > valid > > > > > >> > topology epoch alongside the new topology ID and the topology. > > We > > > > can > > > > > >> > decide if the new topology epoch = current topology at group > > > > coordinator > > > > > >> > + 1 or just new topology epoch > current topology on group > > > > coordinator. > > > > > >> > > > > > > >> > When the group coordinator receives a topology ID that differs > > from > > > > the > > > > > >> > one intialized for the group, the group coordinator only > > upgrades > > > > to the > > > > > >> > new topology if the topology epoch is valid. Then, it sets the > > > > topology > > > > > >> > epoch to the new value. If the topology epoch is not valid, that > > > > means > > > > > >> > the topology ID is an old one and the group coordinator does not > > > > > >> > re-initialize the topology. > > > > > >> > > > > > > >> > The current topology epoch can be retrieved with the describe > > RPC. > > > > > >> > > > > > > >> > Accidental rollbacks of topologies are not be possible. Only > > wanted > > > > > >> > rollbacks of topologies with an appropriate topology epoch are > > > > allowed. > > > > > >> > > > > > > >> > Specific tooling is not needed. > > > > > >> > > > > > > >> > The topology upgrade can be automated in CI in the following > > way: > > > > > >> > > > > > > >> > 1. Call streams group describe to determine the current epoch. > > > > > >> > > > > > > >> > 2. Set the Streams client config topology.epoch = current epoch > > + 1 > > > > > >> > > > > > > >> > 3. Roll the application. > > > > > >> > > > > > > >> > WDYT? > > > > > >> > > > > > > >> > Best, > > > > > >> > Bruno > > > > > >> > > > > > > >> > > > > > > >> > On 07.11.24 18:26, Lucas Brutschy wrote: > > > > > >> > > Hi all, > > > > > >> > > > > > > > >> > > I have updated the KIP with some details around how we handle > > the > > > > > >> > > cases when essential topics required by the topology are not > > > > present. > > > > > >> > > This is described in a new section "Handling topic topology > > > > > >> > > mismatches". The short summary is that we enter a state where > > the > > > > > >> > > group member's heartbeats will succeed, but no assignments > > will be > > > > > >> > > made, and the nature of the topic misconfiguration is > > > > communicated to > > > > > >> > > the members using the `status` field of the heartbeat > > response. > > > > > >> > > > > > > > >> > > Working towards putting up this KIP up for a vote, there was > > one > > > > last > > > > > >> > > concern mentioned during our last sync, which was a concern > > > > raised by > > > > > >> > > Sophie and others around topology updating. I would like to > > > > propose > > > > > >> > > the following changes to the KIP to solidify the story around > > > > topology > > > > > >> > > updating. > > > > > >> > > > > > > > >> > > To recap, a member sends its topology metadata every time it > > > > attempts > > > > > >> > > to join a streams group. When the group already has a topology > > > > set, > > > > > >> > > the KIP proposes to update the topology for the group > > immediately > > > > with > > > > > >> > > that first heartbeat of a member. All other members will then > > > > receive > > > > > >> > > a non-fatal group status INCONSISTENT_TOPOLOGY, letting them > > know > > > > that > > > > > >> > > their topology is not equal to the groups topology anymore. > > Those > > > > > >> > > members with an outdated topology will be able to retain their > > > > current > > > > > >> > > tasks, but will not receive new tasks, until they have also > > > > updated > > > > > >> > > their topology. This enables a rolling bounce to upgrade the > > > > topology. > > > > > >> > > > > > > > >> > > There is a concern with this behavior, raised both in our > > > > discussions > > > > > >> > > but also by Guozhang and Sophie. The problem is that a > > > > “misbehaving” > > > > > >> > > member can always roll back a topology upgrade, if it is still > > > > running > > > > > >> > > with the old version of the topology. For example, if we roll > > 20 > > > > > >> > > members to a new topology version, and we have successfully > > > > restarted > > > > > >> > > 19 out of 20, but the last member is restarted with the old > > > > topology, > > > > > >> > > this will cause the topology version on the broker to be > > rolled > > > > back. > > > > > >> > > This could happen, for example, because topology update fails > > on > > > > the > > > > > >> > > last member, or the last member runs into a fatal error of > > some > > > > kind > > > > > >> > > (out-of-memory or being fenced, for example). Once the failed > > > > member > > > > > >> > > rejoins, it updates the topology on the broker to the “old” > > > > topology, > > > > > >> > > so performs an involuntary roll-back. Now, we would be in a > > > > situation > > > > > >> > > where 19 out of 20 members are blocked from getting new tasks > > > > > >> > > assigned. We would be in this state just temporary, until the > > > > final, > > > > > >> > > misbehaving member is restarted correctly with the new > > topology. > > > > > >> > > > > > > > >> > > The original suggestion in the KIP discussion to solve these > > > > issues > > > > > >> > > (for now) was to exclude topology updates from the KIP. > > However, > > > > it > > > > > >> > > was confirmed by several people (such as Anna) that topology > > > > updates > > > > > >> > > are quite common in the wild, so for having a production-ready > > > > > >> > > implementation of the new protocol, a mechanism for topology > > > > updates > > > > > >> > > is required. It is not clear how strict the requirement for > > > > rolling > > > > > >> > > updates in the first version of the protocol. No ‘better’ > > > > alternative > > > > > >> > > for updating topologies was suggested. > > > > > >> > > > > > > > >> > > Proposal 1) Making topology updating optional: We modify the > > > > heartbeat > > > > > >> > > request to allow joining a group without sending a topology > > (only > > > > a > > > > > >> > > topology ID). When joining a group this way, the member is > > > > explicitly > > > > > >> > > requesting to not update the topology of the group. In other > > > > words, > > > > > >> > > leaving the topology as null, will mean that no topology > > update > > > > will > > > > > >> > > be performed on the broker side. If the topology ID does not > > > > match the > > > > > >> > > topology ID of the group, and no topology update is performed, > > > > and the > > > > > >> > > member that joined will just behave like any other member > > with an > > > > > >> > > inconsistent topology ID. That is, it will not get any tasks > > > > assigned. > > > > > >> > > Streams clients will never send the topology when they know > > they > > > > are > > > > > >> > > just rejoining the group, e.g. because the client was fenced > > and > > > > > >> > > dropped out of the group. That means, a topology update is > > only > > > > > >> > > initiated when a streams application is first started up. This > > > > will > > > > > >> > > prevent "accidental" topology downgrades if we know that we > > are > > > > > >> > > rejoining the group. However, if we rejoin the group without > > > > knowing > > > > > >> > > that we are just rejoining (e.g. since the whole pod was > > > > restarted), > > > > > >> > > we could still run into temporary topology downgrades. > > > > > >> > > > > > > > >> > > Proposal 2) Disable topology updates by default on the client: > > > > > >> > > Second, we can disable topology updates by default from the > > client > > > > > >> > > side, by extending the streams configuration with a config > > > > > >> > > enable_topology_updates which defaults to false and extending > > the > > > > > >> > > heartbeat RPC by a flag UpdateTopology which propagates that > > > > config > > > > > >> > > value to the broker. The advantage of having a client-side > > config > > > > is, > > > > > >> > > that, during a roll, I can set the configuration only for > > updated > > > > > >> > > clients, which can ensure that no accidental rollbacks > > happen. The > > > > > >> > > downside is that after the roll, I would have to do a second > > roll > > > > to > > > > > >> > > disable enable_topology_updates again, so that we are safe > > during > > > > the > > > > > >> > > next roll again. However, avoiding the second roll is more of > > a > > > > > >> > > usability optimization, which is conceptually more complex and > > > > could > > > > > >> > > address in a follow-up KIP, as in proposal 3. > > > > > >> > > > > > > > >> > > Proposal 3) Fixing the topology ID during upgrades (follow-up > > > > KIP): To > > > > > >> > > fully prevent accidental downgrades, one could introduce a > > > > broker-side > > > > > >> > > group-level config that restricts the possible topology IDs > > that a > > > > > >> > > client can upgrade to. This would be quite similar in safety > > as > > > > making > > > > > >> > > topology updates manual. For updating a topology, the user > > would > > > > have > > > > > >> > > to first, extract the topology ID from the update topology - > > this > > > > > >> > > could e.g. be logged when the application is started, second, > > set > > > > the > > > > > >> > > group-level config using an admin client (something like > > > > > >> > > group.streams.allowed_upgrade_topology_id), third, roll the > > > > > >> > > application. This would require quite some extra tooling (how > > to > > > > get > > > > > >> > > the topology ID), so I’d propose introducing this in a > > follow-up > > > > KIP. > > > > > >> > > > > > > > >> > > Let me know if there are any concerns with this approach. > > > > > >> > > > > > > > >> > > Cheers, > > > > > >> > > Lucas > > > > > >> > > > > > > > >> > > On Mon, Oct 7, 2024 at 9:42 AM Bruno Cadonna < > > cado...@apache.org> > > > > > >> wrote: > > > > > >> > >> > > > > > >> > >> Hi all, > > > > > >> > >> > > > > > >> > >> we did some major changes to this KIP that we would like the > > > > > >> community > > > > > >> > >> to review. The changes are the following: > > > > > >> > >> > > > > > >> > >> 1. We merged the Streams group initialize RPC into the > > Streams > > > > group > > > > > >> > >> heartbeat RPC. We decided to merge them because it > > simplifies the > > > > > >> > >> sychronization between initialization and heartbeat during > > group > > > > > >> > >> creation and migration from a classic group. A consequence > > of the > > > > > >> merge > > > > > >> > >> is that the heartbeat now needs permissions to create and > > > > describe > > > > > >> > >> topics which before only the initialize call had. But we > > think > > > > that > > > > > >> is > > > > > >> > >> not a big deal. A consumer of a Streams client will send the > > > > > >> metadata of > > > > > >> > >> its topology in its first heartbeat when it joins the group. > > > > After > > > > > >> that > > > > > >> > >> it will not send the topology metadata anymore. > > > > > >> > >> > > > > > >> > >> 2. We discovered that to create the internal topics, the > > topology > > > > > >> > >> metadata also need to include the co-partition groups since > > we > > > > cannot > > > > > >> > >> infer the co-partitioning solely from the sub-topologies and > > the > > > > > >> input > > > > > >> > >> topics of the sub-topologies. > > > > > >> > >> > > > > > >> > >> 3. We added a section that describes how the migration from a > > > > classic > > > > > >> > >> group to a streams group and back works. This is very > > similar to > > > > how > > > > > >> the > > > > > >> > >> migration in KIP-848 works. > > > > > >> > >> > > > > > >> > >> > > > > > >> > >> Please give us feedback about the changes and the KIP in > > general. > > > > > >> > >> > > > > > >> > >> We would like to start a vote on this KIP as soon as > > possible. > > > > > >> > >> > > > > > >> > >> Best, > > > > > >> > >> Bruno > > > > > >> > >> > > > > > >> > >> On 11.09.24 01:06, Sophie Blee-Goldman wrote: > > > > > >> > >>> Just following up to say thank you Lucas for the detailed > > > > > >> > explanations, and > > > > > >> > >>> especially the thorough response to my more "existential" > > > > questions > > > > > >> > about > > > > > >> > >>> building off 848 vs introducing a separate Streams group > > > > protocol. > > > > > >> This > > > > > >> > >>> really helped me understand the motivations behind this > > > > decision. No > > > > > >> > notes! > > > > > >> > >>> > > > > > >> > >>> On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy > > > > > >> > >>> <lbruts...@confluent.io.invalid> wrote: > > > > > >> > >>> > > > > > >> > >>>> Hi Sophie, > > > > > >> > >>>> > > > > > >> > >>>> thanks for getting deeply involved in this discussion. > > > > > >> > >>>> > > > > > >> > >>>> S16. Adding tagged fields would not require an RPC version > > > > bump at > > > > > >> > >>>> all. This should already a cover a lot of use cases where > > > > > >> requestion > > > > > >> > >>>> versioning wouldn't become an issue, as long as it is safe > > to > > > > > >> ignore > > > > > >> > >>>> the new fields. Other than that, I'm not aware of a formal > > > > > >> definition > > > > > >> > >>>> of the what request version bumps mean in relation to > > Apache > > > > Kafka > > > > > >> > >>>> versions - if anybody knows, please let me know. In my > > > > > >> understanding, > > > > > >> > >>>> the Kafka protocol evolves independently of AK versions, > > and > > > > for > > > > > >> the > > > > > >> > >>>> Kafka protocol to change, you require an accepted KIP, and > > not > > > > an > > > > > >> AK > > > > > >> > >>>> release. The grey area is that a protocol change without > > an AK > > > > > >> release > > > > > >> > >>>> does not mean much, because even if cloud vendors deploy it > > > > before > > > > > >> an > > > > > >> > >>>> AK release, virtually no clients would support it and the > > > > protocol > > > > > >> > >>>> would fall back to the earlier RPC version. So maybe this > > isn't > > > > > >> that > > > > > >> > >>>> clearly formalized because it does not matter much in > > practice. > > > > > >> > >>>> Normally, I would expect an RPC version bump after each KIP > > > > that > > > > > >> > >>>> breaks RPC compatibility. But there seems to be some > > > > flexibility > > > > > >> here > > > > > >> > >>>> - for example, the KIP-848 RPCs were changed after the > > fact in > > > > > >> trunk, > > > > > >> > >>>> because they were not exposed yet by default in AK (or any > > > > other > > > > > >> > >>>> implementation of the Kafka protocol), so a breaking change > > > > without > > > > > >> > >>>> request version bump was probably not deemed problematic. > > > > > >> > >>>> > > > > > >> > >>>> S17/18. We can add an informational table for the > > interface to > > > > the > > > > > >> > >>>> assignor, and that should also make clear how a client-side > > > > > >> assignment > > > > > >> > >>>> interface would look. Generally, our idea would be to send > > all > > > > the > > > > > >> > >>>> input information for the task assignor to a client and > > expect > > > > all > > > > > >> the > > > > > >> > >>>> output information of a task assignor in return, very > > similar > > > > to > > > > > >> > >>>> KIP-848. We did not include such a table because we do not > > > > define a > > > > > >> > >>>> public interface to define custom broker-side assignors in > > this > > > > > >> KIP. > > > > > >> > >>>> But generally, the input / output of the (task) assignor > > mostly > > > > > >> > >>>> follows the data fields sent to / from the brokers. Let me > > do > > > > that, > > > > > >> > >>>> but first I'll respond to the rest of your questions that > > seem > > > > more > > > > > >> > >>>> important. > > > > > >> > >>>> > > > > > >> > >>>> S19. I think an explicit trigger for rebalance could make > > > > sense, > > > > > >> but > > > > > >> > >>>> let me explain the thinking why we haven't included this > > in the > > > > > >> > >>>> current KIP: We don't want to include it for client-side > > > > assignment > > > > > >> > >>>> obviously, because this KIP does not include client-side > > > > > >> assignment. > > > > > >> > >>>> For triggering a rebalance when a warm-up has caught up, > > this > > > > is > > > > > >> > >>>> treated differently in the KIP: the task assignor is called > > > > when a > > > > > >> > >>>> client sends updated task offsets. The client does not > > update > > > > its > > > > > >> task > > > > > >> > >>>> offsets regularly, but according to an interval > > > > > >> > >>>> `task.offset.interval.ms`, but also immediately when a > > client > > > > > >> notices > > > > > >> > >>>> that a warm-up has caught up. So effectively, it is mostly > > the > > > > > >> client > > > > > >> > >>>> that is constantly checking the lag. I absolutely believe > > that > > > > the > > > > > >> > >>>> client should be checking for `acceptable.recovery.lag`, > > and > > > > we are > > > > > >> > >>>> using the presence of the task offset field to trigger > > > > > >> reassignment. > > > > > >> > >>>> What we weren't sure is how often we need the task offsets > > to > > > > be > > > > > >> > >>>> updated and how often we want to retrigger the assignment > > > > because > > > > > >> of > > > > > >> > >>>> that. An explicit field for "forcing" a reassignment may > > still > > > > make > > > > > >> > >>>> sense, so that we can update the task offsets on the broker > > > > more > > > > > >> often > > > > > >> > >>>> than we call the assignor. However, I'm not convinced that > > this > > > > > >> should > > > > > >> > >>>> just be a generic "rebalance trigger", as this seems to be > > > > going > > > > > >> > >>>> against the goals of KIP-848. Generally, I think one of the > > > > > >> principles > > > > > >> > >>>> of KIP-848 is that a member just describes its current > > state > > > > and > > > > > >> > >>>> metadata, and the group coordinator makes a decision on > > whether > > > > > >> > >>>> reassignment needs to be triggered. So it would seem more > > > > natural > > > > > >> to > > > > > >> > >>>> just indicate a boolean "I have caught-up warm-up tasks" > > to the > > > > > >> group > > > > > >> > >>>> coordinator. I understand that for client-side assignment, > > you > > > > may > > > > > >> > >>>> want to trigger assignment explicitely without the group > > > > > >> coordinator > > > > > >> > >>>> understanding why, but then we should add such a field as > > part > > > > of > > > > > >> the > > > > > >> > >>>> client-side assignment KIP. > > > > > >> > >>>> > > > > > >> > >>>> S20. Yes, inside the group coordinator, the main > > differences > > > > are > > > > > >> > >>>> topology initialization handling, and the semantics of task > > > > > >> assignment > > > > > >> > >>>> vs. partition assignment. It will not be a completely > > separate > > > > > >> group > > > > > >> > >>>> coordinator, and we will not change things like the offset > > > > reading > > > > > >> and > > > > > >> > >>>> writing. What will have its own code path is the handling > > of > > > > > >> > >>>> heartbeats and the reconciliation of the group (so, which > > > > tasks are > > > > > >> > >>>> sent to which member at which point in time until we reach > > the > > > > > >> > >>>> declarative target assignment that was produced by a > > > > client-side or > > > > > >> > >>>> server-side assignor). Also, the assignor implementations > > will > > > > > >> > >>>> obviously be completely separate from the consumer group > > > > assignors, > > > > > >> > >>>> but that is already the case currently. Streams groups will > > > > > >> co-exist > > > > > >> > >>>> in the group coordinator with share groups (for queues), > > > > classic > > > > > >> > >>>> groups (old generic protocol for consumers and connect) and > > > > > >> consumer > > > > > >> > >>>> groups (from KIP-848), and potentially a new group type for > > > > > >> connect as > > > > > >> > >>>> well. Each of these group types have a separate code path > > for > > > > heart > > > > > >> > >>>> beating, reconciling assignments, keeping track of the > > internal > > > > > >> state > > > > > >> > >>>> of the group and responding to specific requests from admin > > > > API, > > > > > >> like > > > > > >> > >>>> ConsumerGroupDescribe. For streams, there will be a certain > > > > amount > > > > > >> of > > > > > >> > >>>> duplication from consumer groups - for example, liveness > > > > tracking > > > > > >> for > > > > > >> > >>>> members (setting timers for the session timeout etc.) is > > > > > >> essentially > > > > > >> > >>>> the same between both groups, and the reconciliation logic > > is > > > > quite > > > > > >> > >>>> similar. We will make an effort to refactor as much of this > > > > code > > > > > >> in a > > > > > >> > >>>> way that it can be reused by both group types, and a lot of > > > > things > > > > > >> > >>>> have already been made generic with the introduction of > > share > > > > > >> groups > > > > > >> > >>>> in the group coordinator. > > > > > >> > >>>> > > > > > >> > >>>> S21. It's nice that we are discussing "why are we doing > > this" > > > > in > > > > > >> > >>>> question 21 :D. I think we are touching upon the reasoning > > in > > > > the > > > > > >> KIP, > > > > > >> > >>>> but let me explain the reasoning in more detail. You are > > > > right, the > > > > > >> > >>>> reason we want to implement custom RPCs for Kafka Streams > > is > > > > not > > > > > >> about > > > > > >> > >>>> broker-side vs. client-side assignment. The broker could > > > > implement > > > > > >> the > > > > > >> > >>>> Member Metadata versioning scheme of Kafka Streams, > > intercept > > > > > >> consumer > > > > > >> > >>>> group heartbeats that look like Kafka Streams heartbeats, > > > > > >> deserialize > > > > > >> > >>>> the custom metadata and run a broker-side assignor for > > Kafka > > > > > >> Streams, > > > > > >> > >>>> and finally serialize custom Kafka Streams assignment > > > > metadata, all > > > > > >> > >>>> based on KIP-848 RPCs. > > > > > >> > >>>> > > > > > >> > >>>> First of all, the main difference we see with custom RPCs > > is > > > > really > > > > > >> > >>>> that streams groups become a first-level concept for both > > the > > > > > >> broker > > > > > >> > >>>> and clients of the Kafka. This also goes into the answer > > for > > > > S20 > > > > > >> (what > > > > > >> > >>>> is different beside topology initialization and task > > > > > >> reconciliation). > > > > > >> > >>>> With streams groups, I can use the Admin API call to see > > the > > > > > >> current > > > > > >> > >>>> assignment, the target assignment, all the metadata > > necessary > > > > for > > > > > >> > >>>> assignment, and some information about the current > > topology in > > > > a > > > > > >> > >>>> human-readable and machine-readable form, instead of > > having to > > > > > >> parse > > > > > >> > >>>> it from logs on some client machine that may or may not > > have > > > > the > > > > > >> right > > > > > >> > >>>> log-level configured. This will help humans to understand > > what > > > > is > > > > > >> > >>>> going on, but can also power automated tools / CLIs / UIs, > > and > > > > I > > > > > >> think > > > > > >> > >>>> this is a major improvement for operating a streams > > > > application. > > > > > >> > >>>> Again, this is not about who is in control of the broker > > or the > > > > > >> > >>>> clients, the information becomes accessible for everybody > > who > > > > can > > > > > >> call > > > > > >> > >>>> the Admin API. It also does not depend on the cloud > > vendor, it > > > > > >> will be > > > > > >> > >>>> an improvement available for pure AK users, and will also > > work > > > > with > > > > > >> > >>>> client-side assignment (although obviously, if there is an > > > > > >> intention > > > > > >> > >>>> to start encoding proprietary data in black-box > > byte-arrays, > > > > this > > > > > >> > >>>> information remains hidden to users of the open source > > > > distribution > > > > > >> > >>>> again). > > > > > >> > >>>> > > > > > >> > >>>> I also think that the original proposal of reusing the new > > > > consumer > > > > > >> > >>>> group protocol for Kafka Streams has some downsides, that, > > > > while > > > > > >> not > > > > > >> > >>>> being deal-breakers, would make the new behaviour of Kafka > > > > Streams > > > > > >> > >>>> less than ideal. One example is the reconciliation: if we > > have > > > > a > > > > > >> > >>>> custom stream partition assignor (client- or broker-side > > > > doesn't > > > > > >> > >>>> matter), we can only control the target assignment that the > > > > group > > > > > >> > >>>> converges to, but none of the intermediate states that the > > > > group > > > > > >> > >>>> transitions through while attempting to reach the desired > > end > > > > > >> state. > > > > > >> > >>>> For example, there is no way to prevent the reconciliation > > > > loop of > > > > > >> > >>>> KIP-848 from giving an active task to processX before > > another > > > > > >> instance > > > > > >> > >>>> on processX has revoked the corresponding standby task. > > There > > > > is no > > > > > >> > >>>> synchronization point where these movements are done at the > > > > same > > > > > >> time > > > > > >> > >>>> as in the classic protocol, and on the broker we can't > > prevent > > > > > >> this, > > > > > >> > >>>> because the reconciliation loop in KIP-848 does not know > > what a > > > > > >> > >>>> standby task is, or what a process ID is. You could argue, > > you > > > > can > > > > > >> let > > > > > >> > >>>> the application run into LockExceptions in the meantime, > > but it > > > > > >> will > > > > > >> > >>>> be one of the things that will make Kafka Streams work less > > > > than > > > > > >> > >>>> ideal. Another example is task offsets. It is beneficial > > for > > > > > >> streams > > > > > >> > >>>> partition assignors to have relatively recent task offset > > > > > >> information, > > > > > >> > >>>> which, in KIP-848, needs to be sent to the assignor using a > > > > > >> black-box > > > > > >> > >>>> MemberMetadata byte array. This member metadata byte array > > is > > > > > >> > >>>> persisted to the consumer offset topic every time it > > changes > > > > (for > > > > > >> > >>>> failover). So we end up with a bad trade-off: if we send > > the > > > > task > > > > > >> > >>>> offset information to the broker very frequently, we will > > have > > > > to > > > > > >> > >>>> write those member metadata records very frequently as > > well. > > > > If we > > > > > >> do > > > > > >> > >>>> not send them frequently, streams task assignors will have > > very > > > > > >> > >>>> outdated lag information. In KIP-1071 we can make the > > decision > > > > to > > > > > >> not > > > > > >> > >>>> persist the task offset information, which avoids this > > > > trade-off. > > > > > >> It > > > > > >> > >>>> gives us up-to-date lag information in the usual case, but > > > > after > > > > > >> > >>>> fail-over of the group coordinator, our lag information > > may be > > > > > >> missing > > > > > >> > >>>> for a short period of time. These are two examples, but I > > > > think the > > > > > >> > >>>> general theme is, that with custom RPCs, Kafka Streams is > > in > > > > > >> control > > > > > >> > >>>> of how the group metadata is processed and how the tasks > > are > > > > > >> > >>>> reconciled. This is not just a consideration for now, but > > also > > > > the > > > > > >> > >>>> future: will KIP-848 reconciliation and metadata management > > > > always > > > > > >> > >>>> work "good enough" for Kafka Streams, or will there be use > > > > cases > > > > > >> where > > > > > >> > >>>> we want to evolve and customize the behaviour of Kafka > > Streams > > > > > >> groups > > > > > >> > >>>> independently of consumer groups. > > > > > >> > >>>> > > > > > >> > >>>> A final point that I would like to make, I don't think that > > > > custom > > > > > >> > >>>> streams RPCs are inherently a lot harder to do for the > > Kafka > > > > > >> Streams > > > > > >> > >>>> community than "just utilizing" KIP-848, at least not as > > much > > > > as > > > > > >> you > > > > > >> > >>>> would think. Many things in KIP-848, such customizable > > > > assignment > > > > > >> > >>>> metadata with a metadata versioning scheme are made to look > > > > like > > > > > >> > >>>> general-purpose mechanisms, but are effectively mechanisms > > for > > > > > >> Kafka > > > > > >> > >>>> Streams and also completely unimplemented at this point in > > any > > > > > >> > >>>> implementation of the Kafka protocol. We know that there is > > > > very > > > > > >> > >>>> little use of custom partition assignors in Kafka besides > > Kafka > > > > > >> > >>>> Streams, and we can expect the same to be true of these new > > > > > >> > >>>> "customizable" features of KIP-848. So, besides the Kafka > > > > Streams > > > > > >> > >>>> community, there is no one driving the implementation of > > these > > > > > >> > >>>> features. "Just utilizing" KIP-848 for Kafka Streams would > > > > > >> > >>>> effectively mean to implement all this, and still make a > > lot of > > > > > >> deep > > > > > >> > >>>> changes to Kafka Streams (as mechanisms like probing > > rebalances > > > > > >> etc. > > > > > >> > >>>> don't make sense in KIP-848). I think adapting KIP-848 to > > work > > > > with > > > > > >> > >>>> Kafka Streams either way is a big effort. Piggybacking on > > > > consumer > > > > > >> > >>>> group RPCs and extending the consumer group protocol to the > > > > point > > > > > >> that > > > > > >> > >>>> it can carry Kafka Streams on its back would not make > > things > > > > > >> > >>>> significantly easier. Some things, like online migration > > from > > > > the > > > > > >> old > > > > > >> > >>>> protocol to the new protocol or reimplementing metadata > > > > versioning > > > > > >> on > > > > > >> > >>>> top of KIP-848, may even be harder in KIP-848, than in > > > > KIP-1071. I > > > > > >> > >>>> don't think we can consider consumer group RPCs as a > > shortcut > > > > to > > > > > >> > >>>> revamp the streams rebalance protocol, but should be > > focussing > > > > on > > > > > >> what > > > > > >> > >>>> makes Kafka Streams work best down the road. > > > > > >> > >>>> > > > > > >> > >>>> Hope that makes sense! > > > > > >> > >>>> > > > > > >> > >>>> Cheers, > > > > > >> > >>>> Lucas > > > > > >> > >>>> > > > > > >> > >>>> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman > > > > > >> > >>>> <sop...@responsive.dev> wrote: > > > > > >> > >>>>> > > > > > >> > >>>>> Thanks for another detailed response Lucas! Especially > > w.r.t > > > > how > > > > > >> the > > > > > >> > >>>> epochs > > > > > >> > >>>>> are defined. I also went back and re-read KIP-848 to > > refresh > > > > > >> myself, > > > > > >> > I > > > > > >> > >>>>> guess it wasn't clear to me how much of the next-gen > > consumer > > > > > >> > protocol we > > > > > >> > >>>>> are reusing vs what's being built from the ground up. > > > > > >> > >>>>> > > > > > >> > >>>>> S16. > > > > > >> > >>>>> > > > > > >> > >>>>>> It will become interesting when we want to include > > extra > > > > data > > > > > >> > after > > > > > >> > >>>> the > > > > > >> > >>>>>> initial protocol definition. > > > > > >> > >>>>> > > > > > >> > >>>>> I guess that's what I'm asking us to define -- how do we > > > > evolve > > > > > >> the > > > > > >> > >>>> schema > > > > > >> > >>>>> when expanding the protocol? I think we need to set a few > > > > public > > > > > >> > >>>> contracts > > > > > >> > >>>>> here, such as whether/which versions get bumped when. For > > > > example > > > > > >> if > > > > > >> > I > > > > > >> > >>>>> add/modify fields twice within a single Kafka release, do > > we > > > > still > > > > > >> > need > > > > > >> > >>>> to > > > > > >> > >>>>> update the version? I think the answer is "yes" since I > > know > > > > you > > > > > >> > >>>> Confluent > > > > > >> > >>>>> folks upgrade brokers more often than AK releases, but > > this is > > > > > >> > different > > > > > >> > >>>>> from how eg the SubscriptionInfo/AssignmentInfo of the > > > > > >> > >>>>> StreamsPartitionAssignor works today, hence I want to put > > all > > > > > >> that in > > > > > >> > >>>>> writing (for those of us who aren't familiar with how all > > this > > > > > >> works > > > > > >> > >>>>> already and may be modifying it in the future, say, to add > > > > > >> > client-side > > > > > >> > >>>>> assignment :P) > > > > > >> > >>>>> > > > > > >> > >>>>> Maybe a quick example of what to do to evolve this schema > > is > > > > > >> > sufficient? > > > > > >> > >>>>> > > > > > >> > >>>>> S17. > > > > > >> > >>>>> The KIP seems to throw the assignor, group coordinator, > > topic > > > > > >> > >>>>> initialization, and topology versioning all into a single > > > > black > > > > > >> box > > > > > >> > on > > > > > >> > >>>> the > > > > > >> > >>>>> broker. I understand much of this is intentionally being > > > > glossed > > > > > >> > over as > > > > > >> > >>>> an > > > > > >> > >>>>> implementation detail since it's a KIP, but it would > > really > > > > help > > > > > >> to > > > > > >> > tease > > > > > >> > >>>>> apart these distinct players and define their > > interactions. > > > > For > > > > > >> > example, > > > > > >> > >>>>> what are the inputs and outputs to the assignor? For > > example > > > > > >> could we > > > > > >> > >>>> get a > > > > > >> > >>>>> table like the "Data Model > > > > > >> > >>>>> < > > > > > >> > >>>> > > > > > >> > > > > > > >> > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel > > > > > >> > >>>>> " > > > > > >> > >>>>> in 848? > > > > > >> > >>>>> > > > > > >> > >>>>> KIP-848 goes into detail on these things, but again, it's > > > > unclear > > > > > >> > what > > > > > >> > >>>>> parts of 848 are and aren't supposed to be carried > > through. > > > > For > > > > > >> > example > > > > > >> > >>>> 848 > > > > > >> > >>>>> has a concept of assignor "reasons" and client-side > > assignors > > > > and > > > > > >> > >>>> assignor > > > > > >> > >>>>> metadata which can all trigger a rebalance/group epoch > > bump. > > > > > >> However > > > > > >> > in > > > > > >> > >>>> the > > > > > >> > >>>>> current plan for 1071 none of these exist. Which leads me > > to > > > > my > > > > > >> next > > > > > >> > >>>>> point... > > > > > >> > >>>>> > > > > > >> > >>>>> S18. > > > > > >> > >>>>> Many people use a managed Kafka service like Confluent > > Cloud > > > > where > > > > > >> > >>>> brokers > > > > > >> > >>>>> are on the bleeding edge but clients are upgraded less > > > > frequently. > > > > > >> > >>>> However, > > > > > >> > >>>>> the opposite is also true for a great many users: client > > > > > >> > applications are > > > > > >> > >>>>> easy to upgrade and bumped often to get new features, > > whereas > > > > the > > > > > >> > brokers > > > > > >> > >>>>> (often run by a different team altogether) are considered > > > > riskier > > > > > >> to > > > > > >> > >>>>> upgrade and stay on one version for a long time. We can > > argue > > > > > >> about > > > > > >> > which > > > > > >> > >>>>> case is more common -- in my personal experience it's > > > > actually the > > > > > >> > >>>> latter, > > > > > >> > >>>>> although I only realized this after my time at Confluent > > > > which was > > > > > >> > >>>>> obviously skewed towards meeting Confluent Cloud users -- > > but > > > > > >> > regardless, > > > > > >> > >>>>> we obviously need to support both cases as first-class > > > > citizens. > > > > > >> And > > > > > >> > I'm > > > > > >> > >>>> a > > > > > >> > >>>>> bit concerned that there's not enough in the KIP to ensure > > > > that > > > > > >> > >>>> client-side > > > > > >> > >>>>> assignment won't end up getting hacked into the protocol > > as an > > > > > >> > >>>> afterthought. > > > > > >> > >>>>> > > > > > >> > >>>>> Don't get me wrong, I'm still not demanding you guys > > implement > > > > > >> > >>>> client-side > > > > > >> > >>>>> assignors as part of this KIP. I'm happy to leave the > > > > > >> implementation > > > > > >> > of > > > > > >> > >>>>> that to a followup, but I feel we need to flesh out the > > basic > > > > > >> > direction > > > > > >> > >>>> for > > > > > >> > >>>>> this right now, as part of the v0 Streams protocol. > > > > > >> > >>>>> > > > > > >> > >>>>> I'm also not asking you to do this alone, and am happy to > > work > > > > > >> > with/for > > > > > >> > >>>> you > > > > > >> > >>>>> on this. I have one possible proposal sketched out > > already, > > > > > >> although > > > > > >> > >>>> before > > > > > >> > >>>>> I share that I just wanted to make sure we're aligned on > > the > > > > > >> > fundamentals > > > > > >> > >>>>> and have mapped out the protocol in full first (ie S17). > > Just > > > > > >> want to > > > > > >> > >>>> have > > > > > >> > >>>>> a rough plan for how clients-side assignment will fit > > into the > > > > > >> > picture, > > > > > >> > >>>> and > > > > > >> > >>>>> right now it's difficult to define because it's unclear > > where > > > > the > > > > > >> > >>>>> boundaries between the group coordinator and broker-side > > > > assignor > > > > > >> > are (eg > > > > > >> > >>>>> what are assignor inputs and outputs). > > > > > >> > >>>>> > > > > > >> > >>>>> I'm committed to ensuring this doesn't add extra work or > > > > delay to > > > > > >> > this > > > > > >> > >>>> KIP > > > > > >> > >>>>> for you guys. > > > > > >> > >>>>> > > > > > >> > >>>>> S19. > > > > > >> > >>>>> I think it could make sense to include a concept like the > > > > > >> "assignor > > > > > >> > >>>>> reason" from 848 as a top-level field and rebalance > > trigger > > > > (ie > > > > > >> > group > > > > > >> > >>>>> epoch bump). This would mainly be for the client-side > > > > assignor to > > > > > >> > >>>> trigger a > > > > > >> > >>>>> rebalance, but for another example, it seems like we want > > the > > > > > >> > brokers to > > > > > >> > >>>> be > > > > > >> > >>>>> constantly monitoring warmup tasks and processing lag > > updates > > > > to > > > > > >> know > > > > > >> > >>>> when > > > > > >> > >>>>> a warmup task is to be switched, which is heavy work. Why > > not > > > > just > > > > > >> > let > > > > > >> > >>>> the > > > > > >> > >>>>> client who owns the warmup and knows the lag decide when > > it > > > > has > > > > > >> > warmed up > > > > > >> > >>>>> and signal the broker? It's a small thing for a client to > > > > check > > > > > >> when > > > > > >> > its > > > > > >> > >>>>> task gets within the acceptable recovery lag and notify > > via > > > > the > > > > > >> > heartbeat > > > > > >> > >>>>> if so, but it seems like a rather big deal for the broker > > to > > > > be > > > > > >> > >>>> constantly > > > > > >> > >>>>> computing these checks for every task on every hb. Just > > food > > > > for > > > > > >> > thought, > > > > > >> > >>>>> I don't really have a strong preference over where the > > warmup > > > > > >> > decision > > > > > >> > >>>>> happens -- simply pointing it out that it fits very neatly > > > > into > > > > > >> the > > > > > >> > >>>>> "assignor reason"/"rebalance requested" framework. Making > > > > this a > > > > > >> > >>>>> client-side decision also makes it easier to evolve and > > > > customize > > > > > >> > since > > > > > >> > >>>>> Kafka Streams users tend to have more control over > > > > > >> > upgrading/implementing > > > > > >> > >>>>> stuff in their client application vs the brokers (which is > > > > often a > > > > > >> > >>>>> different team altogether) > > > > > >> > >>>>> > > > > > >> > >>>>> S20. > > > > > >> > >>>>> To what extent do we plan to reuse implementations of 848 > > in > > > > > >> order to > > > > > >> > >>>> build > > > > > >> > >>>>> the new Streams group protocol? I'm still having some > > trouble > > > > > >> > >>>> understanding > > > > > >> > >>>>> where we are just adding stuff on top and where we plan to > > > > rebuild > > > > > >> > from > > > > > >> > >>>> the > > > > > >> > >>>>> ground up. For example we seem to share most of the epoch > > > > > >> > reconciliation > > > > > >> > >>>>> stuff but will have different triggers for the group epoch > > > > bump. > > > > > >> Is > > > > > >> > it > > > > > >> > >>>>> possible to just "plug in" the group epoch and let the > > > > consumer's > > > > > >> > group > > > > > >> > >>>>> coordinator figure stuff out from there? Will we have a > > > > completely > > > > > >> > >>>> distinct > > > > > >> > >>>>> Streams coordinator? I know this is an implementation > > detail > > > > but > > > > > >> imo > > > > > >> > it's > > > > > >> > >>>>> ok to get in the weeds a bit for a large and complex KIP > > such > > > > as > > > > > >> > this. > > > > > >> > >>>>> > > > > > >> > >>>>> Setting aside the topology initialization stuff, is the > > main > > > > > >> > difference > > > > > >> > >>>>> really just in how the group coordinator will be aware of > > > > tasks > > > > > >> for > > > > > >> > >>>> Streams > > > > > >> > >>>>> groups vs partitions for consumer groups? > > > > > >> > >>>>> > > > > > >> > >>>>> Which brings me to a high level question I feel I have to > > ask: > > > > > >> > >>>>> > > > > > >> > >>>>> S21. > > > > > >> > >>>>> Can you go into more detail on why we have to provide > > Kafka > > > > > >> Streams > > > > > >> > its > > > > > >> > >>>> own > > > > > >> > >>>>> RPCs as opposed to just utilizing the protocol from > > KIP-848? > > > > The > > > > > >> > Rejected > > > > > >> > >>>>> Alternatives does address this but the main argument there > > > > seems > > > > > >> to > > > > > >> > be in > > > > > >> > >>>>> implementing a broker-side assignor. But can't you do this > > > > with > > > > > >> 848? > > > > > >> > And > > > > > >> > >>>>> given there are users who rarely upgrade their brokers > > just as > > > > > >> there > > > > > >> > are > > > > > >> > >>>>> users who rarely upgrade their clients, it seems silly to > > do > > > > all > > > > > >> this > > > > > >> > >>>> work > > > > > >> > >>>>> and change the fundamental direction of Kafka Streams > > just to > > > > give > > > > > >> > more > > > > > >> > >>>>> control over the assignment to the brokers. > > > > > >> > >>>>> > > > > > >> > >>>>> For a specific example, the KIP says "Client-side > > assignment > > > > > >> makes it > > > > > >> > >>>> hard > > > > > >> > >>>>> to debug and tune the assignment logic" -- personally, I > > would > > > > > >> find > > > > > >> > it > > > > > >> > >>>>> infinitely more difficult to tune and debug assignment > > logic > > > > if it > > > > > >> > were > > > > > >> > >>>>> happening on the broker. But that's just because I work > > with > > > > the > > > > > >> app > > > > > >> > >>>> devs, > > > > > >> > >>>>> not the cluster operators. The point is we shouldn't > > design an > > > > > >> entire > > > > > >> > >>>>> protocol to give preference to one vendor or another -- > > nobody > > > > > >> wants > > > > > >> > the > > > > > >> > >>>>> ASF to start yelling at us lol D: > > > > > >> > >>>>> > > > > > >> > >>>>> Another example, which imo is a good one (IIUC) is that "a > > > > simple > > > > > >> > >>>> parameter > > > > > >> > >>>>> change requires redeploying of all streams clients". > > That's > > > > > >> > definitely a > > > > > >> > >>>>> huge bummer, but again, why would broker-side assignors > > not be > > > > > >> able > > > > > >> > to > > > > > >> > >>>>> trigger a reassignment based on a custom config in the 848 > > > > > >> protocol? > > > > > >> > We > > > > > >> > >>>>> could still have the ha and sticky assignors implemented > > on > > > > the > > > > > >> > brokers, > > > > > >> > >>>>> and could still define all the Streams configs as broker > > > > configs > > > > > >> to > > > > > >> > be > > > > > >> > >>>>> passed into the streams custom broker assignors. That > > feels > > > > like a > > > > > >> > lot > > > > > >> > >>>> less > > > > > >> > >>>>> work than redoing our own group protocol from scratch? > > > > > >> > >>>>> > > > > > >> > >>>>> I don't want you guys to think I'm trying to block the > > entire > > > > > >> > direction > > > > > >> > >>>> of > > > > > >> > >>>>> this KIP. I'd just like to better understand your > > thoughts on > > > > this > > > > > >> > since > > > > > >> > >>>> I > > > > > >> > >>>>> assume you've discussed this at length internally. Just > > help > > > > me > > > > > >> > >>>> understand > > > > > >> > >>>>> why this huge proposal is justified (for my own sake and > > to > > > > > >> convince > > > > > >> > any > > > > > >> > >>>>> "others" who might be skeptical) > > > > > >> > >>>>> > > > > > >> > >>>>> > > > > > >> > >>>>> On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax < > > > > mj...@apache.org> > > > > > >> > wrote: > > > > > >> > >>>>> > > > > > >> > >>>>>> I still need to catch up on the discussion in general. > > But as > > > > > >> > promised, > > > > > >> > >>>>>> I just started KIP-1088 about the `KafkaClientSupplier` > > > > question. > > > > > >> > >>>>>> > > > > > >> > >>>>>> Looking forward to your feedback on the new KIP. I hope > > we > > > > can > > > > > >> get > > > > > >> > >>>>>> KIP-1088 done soon, to not block this KIP. > > > > > >> > >>>>>> > > > > > >> > >>>>>> > > > > > >> > >>>>>> -Matthias > > > > > >> > >>>>>> > > > > > >> > >>>>>> On 9/4/24 09:52, Lucas Brutschy wrote: > > > > > >> > >>>>>>> Hi Sophie, > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> thanks for the questions and comments! > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S1. KIP-1071 does not need to add public interfaces to > > > > streams, > > > > > >> but > > > > > >> > >>>> it > > > > > >> > >>>>>>> does not (yet) make the streams protocol client > > pluggable. > > > > > >> Matthias > > > > > >> > >>>>>>> promised to write a KIP that will propose a solution > > for the > > > > > >> > >>>>>>> KafkaClientSupplier soon. He suggested keeping it > > separate > > > > from > > > > > >> > this > > > > > >> > >>>>>>> KIP, but we can treat it as a blocker. You are right > > that we > > > > > >> could > > > > > >> > >>>> try > > > > > >> > >>>>>>> fitting an explicit topic initialization in this > > interface, > > > > if > > > > > >> we > > > > > >> > >>>>>>> decide we absolutely need it now, although I'm not sure > > if > > > > that > > > > > >> > would > > > > > >> > >>>>>>> actually define a useful workflow for the users to > > > > explicitly > > > > > >> > update > > > > > >> > >>>> a > > > > > >> > >>>>>>> topology. Explicit initialization cannot be part of > > admin > > > > tools, > > > > > >> > >>>>>>> because we do not have the topology there. It could > > > > probably be > > > > > >> a > > > > > >> > >>>>>>> config that defaults to off, or a method on the > > KafkaStreams > > > > > >> > object? > > > > > >> > >>>>>>> To me, explicit initialization opens a broad discussion > > on > > > > how > > > > > >> to > > > > > >> > do > > > > > >> > >>>>>>> it, so I would like to avoid including it in this KIP. I > > > > guess > > > > > >> it > > > > > >> > >>>> also > > > > > >> > >>>>>>> depends on the outcome of the discussion in S8-S10. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S2. It's hard to define the perfect default, and we > > should > > > > > >> probably > > > > > >> > >>>>>>> aim for safe operations first - since people can always > > up > > > > the > > > > > >> > limit. > > > > > >> > >>>>>>> I changed it to 20. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S11. I was going to write we can add it, but it's > > actually > > > > quite > > > > > >> > >>>>>>> independent of KIP-1071 and also a nice newbie ticket, > > so I > > > > > >> would > > > > > >> > >>>>>>> propose we treat it as out-of-scope and let somebody > > define > > > > it > > > > > >> in a > > > > > >> > >>>>>>> separate KIP - any discussion about it would anyway be > > lost > > > > in > > > > > >> this > > > > > >> > >>>>>>> long discussion thread. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S12. Member epoch behaves the same as in KIP-848. See > > the > > > > > >> KIP-848 > > > > > >> > >>>>>>> explanation for basics: > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> > > > > > >> > >>>>>> > > > > > >> > >>>> > > > > > >> > > > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> Together with the section "Reconciliation of the group", > > > > this > > > > > >> > should > > > > > >> > >>>>>>> make clear how member epochs, group epochs and > > assignment > > > > epochs > > > > > >> > work > > > > > >> > >>>>>>> in KIP-1071, but I'll try to summarize the > > reconciliation a > > > > bit > > > > > >> > more > > > > > >> > >>>>>>> explicitly below: > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> Essentially, the broker drives a reconciliation for each > > > > member > > > > > >> > >>>>>>> separately. The goal is to reconcile the assignment > > > > generated by > > > > > >> > the > > > > > >> > >>>>>>> assignor and reach the assignment epoch, but it's an > > > > > >> asynchronous > > > > > >> > >>>>>>> process, so each member may still be on an older epoch, > > > > which is > > > > > >> > the > > > > > >> > >>>>>>> member epoch. We do not reinvent any of the internal > > > > machinery > > > > > >> for > > > > > >> > >>>>>>> streams here, we are just adding standby tasks and > > warm-up > > > > > >> tasks to > > > > > >> > >>>>>>> the reconciliation of KIP-848. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> During reconciliation, the broker first asks the member > > to > > > > > >> revoke > > > > > >> > all > > > > > >> > >>>>>>> tasks that are not in its target assignment, by > > removing it > > > > from > > > > > >> > the > > > > > >> > >>>>>>> current assignment for the member (which is provided in > > the > > > > > >> > heartbeat > > > > > >> > >>>>>>> response). Until revocation is completed, the member > > > > remains on > > > > > >> the > > > > > >> > >>>>>>> old member epoch and in `UNREVOKED` state. Only once the > > > > member > > > > > >> > >>>>>>> confirms revocation of these tasks by removing them > > from the > > > > > >> set of > > > > > >> > >>>>>>> owned tasks in the heartbeat request, the member can > > > > transition > > > > > >> to > > > > > >> > >>>> the > > > > > >> > >>>>>>> current assignment epoch and get new tasks assigned. > > Once > > > > the > > > > > >> > member > > > > > >> > >>>>>>> has revoked all tasks that are not in its target > > > > assignment, it > > > > > >> > >>>>>>> transitions to the new epoch and gets new tasks > > assigned. > > > > There > > > > > >> are > > > > > >> > >>>>>>> some tasks that may be in the (declarative) target > > > > assignment of > > > > > >> > the > > > > > >> > >>>>>>> member generated by the assignor, but that may not be > > > > > >> immediately > > > > > >> > >>>>>>> added to the current assignment of the member, because > > they > > > > are > > > > > >> > still > > > > > >> > >>>>>>> "unreleased". Unreleased tasks are: > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> - A. For any task, another instance of the same process > > may > > > > own > > > > > >> the > > > > > >> > >>>>>>> task in any role (active/standby/warm-up), and we are > > still > > > > > >> > awaiting > > > > > >> > >>>>>>> revocation. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> - B. For an active task, an instance of any process may > > own > > > > the > > > > > >> > same > > > > > >> > >>>>>>> active task, and we are still awaiting revocation. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> As long as there is at least one unreleased task, the > > member > > > > > >> > remains > > > > > >> > >>>>>>> in state UNRELEASED. Once all tasks of the target > > > > assignment are > > > > > >> > >>>> added > > > > > >> > >>>>>>> to the current assignment of the member (because they > > were > > > > > >> > released), > > > > > >> > >>>>>>> the member transitions to STABLE. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> Technically, the first constraint (we cannot assign a > > task > > > > in > > > > > >> two > > > > > >> > >>>>>>> different roles to the same process) is only relevant > > for > > > > > >> stateful > > > > > >> > >>>>>>> tasks that use the local state directory. I wonder if we > > > > should > > > > > >> > relax > > > > > >> > >>>>>>> this restriction slightly, by marking sub-topologies > > that > > > > access > > > > > >> > the > > > > > >> > >>>>>>> local state directory. This information could also be > > used > > > > by > > > > > >> the > > > > > >> > >>>>>>> assignor. But we can also introduce as a follow-up > > > > improvement. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S13i. You are right, these bumps of the group epoch are > > "on > > > > > >> top" of > > > > > >> > >>>>>>> the KIP-848 reasons for bumping the group epoch. I > > added the > > > > > >> > KIP-848 > > > > > >> > >>>>>>> reasons as well, because this was a bit confusing in the > > > > KIP. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S13ii. The group epoch is bumped immediately once a > > warm-up > > > > task > > > > > >> > has > > > > > >> > >>>>>>> caught up, since this is what triggers running the > > assignor. > > > > > >> > However, > > > > > >> > >>>>>>> the assignment epoch is only bumped once the task > > assignor > > > > has > > > > > >> run. > > > > > >> > >>>> If > > > > > >> > >>>>>>> the task assignment for a member hasn't changed, it will > > > > > >> transition > > > > > >> > >>>> to > > > > > >> > >>>>>>> the new assignment epoch immediately. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S13iii. In the case of a rolling bounce, the group epoch > > > > will be > > > > > >> > >>>>>>> bumped for every member that leaves and joins the > > group. For > > > > > >> > updating > > > > > >> > >>>>>>> rack ID, client tags, topology ID etc. -- this will > > mostly > > > > play > > > > > >> a > > > > > >> > >>>> role > > > > > >> > >>>>>>> for static membership, as you mentioned, but would > > cause the > > > > > >> same > > > > > >> > >>>>>>> thing - in a rolling bounce with static membership, we > > will > > > > > >> attempt > > > > > >> > >>>> to > > > > > >> > >>>>>>> recompute the assignment after every bounce, if the > > > > topology ID > > > > > >> is > > > > > >> > >>>>>>> changed in the members. This can cause broker load, as > > you > > > > > >> > mentioned. > > > > > >> > >>>>>>> Note, however, that we don't have to compute an > > assignment > > > > for > > > > > >> > every > > > > > >> > >>>>>>> group epoch bump. While KIP-848 computes its assignment > > as > > > > part > > > > > >> of > > > > > >> > >>>> the > > > > > >> > >>>>>>> main loop of the group coordinator, it uses asynchronous > > > > > >> assignment > > > > > >> > >>>> in > > > > > >> > >>>>>>> the case of client-side assignment, where the > > assignment is > > > > > >> slower. > > > > > >> > >>>> So > > > > > >> > >>>>>>> it will kick off the assignment on the client side, > > enter > > > > state > > > > > >> > >>>>>>> `ASSIGNING`, but may have more group epoch bumps in the > > > > meantime > > > > > >> > >>>> while > > > > > >> > >>>>>>> the assignment is computed on the client side. We leave > > it > > > > open > > > > > >> to > > > > > >> > >>>> use > > > > > >> > >>>>>>> the same mechanism on the broker to compute assignment > > > > > >> > asynchronously > > > > > >> > >>>>>>> on a separate thread, in case streams assignment is too > > > > slow for > > > > > >> > the > > > > > >> > >>>>>>> main loop of the group coordinator. We will also > > consider > > > > > >> > >>>>>>> rate-limiting the assignor to reduce broker load. > > However, > > > > it's > > > > > >> too > > > > > >> > >>>>>>> early to define this in detail, as we don't know much > > about > > > > the > > > > > >> > >>>>>>> performance impact yet. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S13iv. The heartbeat just sends "null" instead of the > > actual > > > > > >> value > > > > > >> > in > > > > > >> > >>>>>>> case the value didn't change since the last heartbeat, > > so no > > > > > >> > >>>>>>> comparison is required. This is defined in the heartbeat > > > > RPC and > > > > > >> > the > > > > > >> > >>>>>>> same as in KIP-848. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S13v. I think I disagree. The group epoch represents a > > > > version > > > > > >> of > > > > > >> > the > > > > > >> > >>>>>>> group state, including the current topology and the > > > > metadata of > > > > > >> all > > > > > >> > >>>>>>> members -- the whole input to the task assignor -- and > > > > defines > > > > > >> > >>>> exactly > > > > > >> > >>>>>>> the condition under which we attempt to recompute the > > > > > >> assignment. > > > > > >> > So > > > > > >> > >>>>>>> it's not overloaded, and has a clear definition. It's > > also > > > > > >> exactly > > > > > >> > >>>> the > > > > > >> > >>>>>>> same as in KIP-848, and we do not want to move away too > > far > > > > from > > > > > >> > that > > > > > >> > >>>>>>> protocol, unless it's something that is specific to > > streams > > > > that > > > > > >> > need > > > > > >> > >>>>>>> to be changed. Also, the protocol already uses three > > types > > > > of > > > > > >> > epochs > > > > > >> > >>>> - > > > > > >> > >>>>>>> member epoch, assignment epoch, group epoch. I don't > > think > > > > > >> adding > > > > > >> > >>>> more > > > > > >> > >>>>>>> epochs will make things clearer. Unless we have a strong > > > > reason > > > > > >> why > > > > > >> > >>>> we > > > > > >> > >>>>>>> need to add extra epochs, I'm not in favor. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S14. Assignment epoch is the same as in KIP-848: > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> > > > > > >> > >>>>>> > > > > > >> > >>>> > > > > > >> > > > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> Its definition does not change at all, so we didn't > > > > describe it > > > > > >> in > > > > > >> > >>>>>>> detail in KIP-1071. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S15. I agree that the name is a bit generic, but the > > time to > > > > > >> change > > > > > >> > >>>>>>> this was when `group.instance.id` was introduced. > > Also, if > > > > we > > > > > >> > start > > > > > >> > >>>>>>> diverging from regular consumers / consumer groups in > > terms > > > > of > > > > > >> > >>>> naming, > > > > > >> > >>>>>>> it will not make things clearer. "application.id" and " > > > > group.id > > > > > >> " > > > > > >> > are > > > > > >> > >>>>>>> bad enough, in my eyes. I hope it's okay if I just > > improve > > > > the > > > > > >> > field > > > > > >> > >>>>>>> documentation. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> S16. This goes back to KIP-482. Essentially, in flexible > > > > request > > > > > >> > >>>>>>> schema versions you can define optional fields that > > will be > > > > > >> encoded > > > > > >> > >>>>>>> with a field tag on the wire, but can also be completely > > > > > >> omitted. > > > > > >> > >>>>>>> This is similar to how fields are encoded in, e.g., > > > > protobuf. It > > > > > >> > >>>>>>> improves schema evolution, because we can add new > > fields to > > > > the > > > > > >> > >>>> schema > > > > > >> > >>>>>>> without bumping the version. There is also a minor > > > > difference in > > > > > >> > >>>>>>> encoding size: when encoded, tagged fields take up more > > > > bytes on > > > > > >> > the > > > > > >> > >>>>>>> wire because of the field tag, but they can be omitted > > > > > >> completely. > > > > > >> > It > > > > > >> > >>>>>>> will become interesting when we want to include extra > > data > > > > after > > > > > >> > the > > > > > >> > >>>>>>> initial protocol definition. > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> Cheers, > > > > > >> > >>>>>>> Lucas > > > > > >> > >>>>>>> > > > > > >> > >>>>>>> On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman > > > > > >> > >>>>>>> <sop...@responsive.dev> wrote: > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> Ah, my bad -- I thought I refreshed the page to get the > > > > latest > > > > > >> > >>>> version > > > > > >> > >>>>>>>> which is why I was a bit confused when I couldn't find > > > > anything > > > > > >> > >>>> about > > > > > >> > >>>>>> the > > > > > >> > >>>>>>>> new tools which I had previously seen in the KIP. Sorry > > > > for the > > > > > >> > >>>>>> confusion > > > > > >> > >>>>>>>> and unnecessary questions > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S1. > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>>> You could imagine calling the initialize RPC > > > > > >> > >>>>>>>>> explicitly (to implement explicit initialization), but > > > > this > > > > > >> would > > > > > >> > >>>>>>>>> still mean sending an event to the background thread, > > and > > > > the > > > > > >> > >>>>>>>>> background thread in turn invokes the RPC. However, > > > > explicit > > > > > >> > >>>>>>>>> initialization would require some additional public > > > > interfaces > > > > > >> > >>>> that we > > > > > >> > >>>>>>>>> are not including in this KIP. > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> I'm confused -- if we do all the group management > > stuff in > > > > a > > > > > >> > >>>> background > > > > > >> > >>>>>>>> thread to avoid needing public APIs, how do we pass > > all the > > > > > >> > >>>>>>>> Streams-specific and app-specific info to the broker > > for > > > > the > > > > > >> > >>>>>>>> StreamsGroupInitialize? I am guessing this is where we > > > > need to > > > > > >> > >>>> invoke > > > > > >> > >>>>>>>> internal APIs and is related to our discussion about > > > > removing > > > > > >> the > > > > > >> > >>>>>>>> KafkaClientSupplier. However, it seems to me that no > > > > matter how > > > > > >> > you > > > > > >> > >>>>>> look at > > > > > >> > >>>>>>>> it, Kafka Streams will need to pass information to and > > > > from the > > > > > >> > >>>>>> background > > > > > >> > >>>>>>>> thread if the background thread is to be aware of > > things > > > > like > > > > > >> > tasks, > > > > > >> > >>>>>> offset > > > > > >> > >>>>>>>> sums, repartition topics, etc > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> We don't really need to rehash everything here since it > > > > seems > > > > > >> like > > > > > >> > >>>> the > > > > > >> > >>>>>>>> proposal to add a StreamsConsumer/StreamsAssignment > > > > interface > > > > > >> will > > > > > >> > >>>>>> address > > > > > >> > >>>>>>>> this, which I believe we're in agreement on. I just > > wanted > > > > to > > > > > >> > point > > > > > >> > >>>> out > > > > > >> > >>>>>>>> that this work to replace the KafkaClientSupplier is > > > > clearly > > > > > >> > >>>> intimately > > > > > >> > >>>>>>>> tied up with KIP-1071, and should imho be part of this > > KIP > > > > and > > > > > >> not > > > > > >> > >>>> its > > > > > >> > >>>>>> own > > > > > >> > >>>>>>>> standalone thing. > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> By the way I'm happy to hop on a call to help move > > things > > > > > >> forward > > > > > >> > on > > > > > >> > >>>>>> that > > > > > >> > >>>>>>>> front (or anything really). Although I guess we're just > > > > waiting > > > > > >> > on a > > > > > >> > >>>>>> design > > > > > >> > >>>>>>>> right now? > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S2. I was suggesting something lower for the default > > upper > > > > > >> limit > > > > > >> > on > > > > > >> > >>>> all > > > > > >> > >>>>>>>> groups. I don't feel too strongly about it, just > > wanted to > > > > > >> point > > > > > >> > out > > > > > >> > >>>>>> that > > > > > >> > >>>>>>>> the tradeoff is not about the assignor runtime but > > rather > > > > about > > > > > >> > >>>> resource > > > > > >> > >>>>>>>> consumption, and that too many warmups could put undue > > > > load on > > > > > >> the > > > > > >> > >>>>>> cluster. > > > > > >> > >>>>>>>> In the end if we want to trust application operators > > then > > > > I'd > > > > > >> say > > > > > >> > >>>> it's > > > > > >> > >>>>>> fine > > > > > >> > >>>>>>>> to use a higher cluster-wide max like 100. > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S8-10 I'll get back to you on this in another followup > > > > since > > > > > >> I'm > > > > > >> > >>>> still > > > > > >> > >>>>>>>> thinking some things through and want to keep the > > > > discussion > > > > > >> > >>>> rolling for > > > > > >> > >>>>>>>> now. In the meantime I have some additional questions: > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S11. One of the main issues with unstable assignments > > > > today is > > > > > >> the > > > > > >> > >>>> fact > > > > > >> > >>>>>>>> that assignors rely on deterministic assignment and > > use the > > > > > >> > process > > > > > >> > >>>> Id, > > > > > >> > >>>>>>>> which is not configurable and only persisted via local > > > > disk in > > > > > >> a > > > > > >> > >>>>>>>> best-effort attempt. It would be a very small change to > > > > include > > > > > >> > >>>> this in > > > > > >> > >>>>>>>> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to > > do > > > > the > > > > > >> PR > > > > > >> > for > > > > > >> > >>>>>> this > > > > > >> > >>>>>>>> myself so as not to add to your load). There's a ticket > > > > for it > > > > > >> > here: > > > > > >> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-15190 > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S12. What exactly is the member epoch and how is it > > > > defined? > > > > > >> When > > > > > >> > >>>> is it > > > > > >> > >>>>>>>> bumped? I see in the HB request field that certain > > values > > > > > >> signal a > > > > > >> > >>>>>> member > > > > > >> > >>>>>>>> joining/leaving, but I take it there are valid positive > > > > values > > > > > >> > >>>> besides > > > > > >> > >>>>>> the > > > > > >> > >>>>>>>> 0/-1/-2 codes? More on this in the next question: > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S13. The KIP says the group epoch is updated in these > > three > > > > > >> cases: > > > > > >> > >>>>>>>> a. Every time the topology is updated through the > > > > > >> > >>>>>> StreamsGroupInitialize API > > > > > >> > >>>>>>>> b. When a member with an assigned warm-up task reports > > a > > > > task > > > > > >> > >>>> changelog > > > > > >> > >>>>>>>> offset and task changelog end offset whose difference > > is > > > > less > > > > > >> that > > > > > >> > >>>>>>>> acceptable.recovery.lag. > > > > > >> > >>>>>>>> c. When a member updates its topology ID, rack ID, > > client > > > > tags > > > > > >> or > > > > > >> > >>>>>> process > > > > > >> > >>>>>>>> ID. Note: Typically, these do not change within the > > > > lifetime > > > > > >> of a > > > > > >> > >>>>>> Streams > > > > > >> > >>>>>>>> client, so this only happens when a member with static > > > > > >> membership > > > > > >> > >>>>>> rejoins > > > > > >> > >>>>>>>> with an updated configuration. > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S13.i First, just to clarify, these are not the *only* > > > > times > > > > > >> the > > > > > >> > >>>> group > > > > > >> > >>>>>>>> epoch is bumped but rather the additional cases on top > > of > > > > the > > > > > >> > >>>> regular > > > > > >> > >>>>>>>> consumer group protocol -- so it's still bumped when a > > new > > > > > >> member > > > > > >> > >>>> joins > > > > > >> > >>>>>> or > > > > > >> > >>>>>>>> leaves, etc, right? > > > > > >> > >>>>>>>> S13.ii Second, does (b) imply the epoch is bumped just > > > > whenever > > > > > >> > the > > > > > >> > >>>>>> broker > > > > > >> > >>>>>>>> notices a task has finished warming up, or does it > > first > > > > > >> reassign > > > > > >> > >>>> the > > > > > >> > >>>>>> task > > > > > >> > >>>>>>>> and then bump the group epoch as a result of this task > > > > > >> > reassignment? > > > > > >> > >>>>>>>> S13.iii Third, (a) mentions the group epoch is bumped > > on > > > > each > > > > > >> > >>>>>>>> StreamsGroupInitialize API but (c) says it gets bumped > > > > > >> whenever a > > > > > >> > >>>> member > > > > > >> > >>>>>>>> updates its topology ID. Does this mean that in the > > event > > > > of a > > > > > >> > >>>> rolling > > > > > >> > >>>>>>>> upgrade that changes the topology, the group epoch is > > > > bumped > > > > > >> just > > > > > >> > >>>> once > > > > > >> > >>>>>> or > > > > > >> > >>>>>>>> for each member that updates its topology ID? Or does > > the > > > > > >> > "updating > > > > > >> > >>>> its > > > > > >> > >>>>>>>> topology ID" somehow have something to do with static > > > > > >> membership? > > > > > >> > >>>>>>>> S13.iv How does the broker know when a member has > > changed > > > > its > > > > > >> rack > > > > > >> > >>>> ID, > > > > > >> > >>>>>>>> client tags, etc -- does it compare these values on > > every > > > > hb?? > > > > > >> > That > > > > > >> > >>>>>> seems > > > > > >> > >>>>>>>> like a lot of broker load. If this is all strictly for > > > > static > > > > > >> > >>>> membership > > > > > >> > >>>>>>>> then have we considered leaving static membership > > support > > > > for a > > > > > >> > >>>> followup > > > > > >> > >>>>>>>> KIP? Not necessarily advocating for that, just > > wondering if > > > > > >> this > > > > > >> > was > > > > > >> > >>>>>>>> thought about already. Imo it would be acceptable to > > not > > > > > >> support > > > > > >> > >>>> static > > > > > >> > >>>>>>>> membership in the first version (especially if we fixed > > > > other > > > > > >> > issues > > > > > >> > >>>>>> like > > > > > >> > >>>>>>>> making the process Id configurable to get actual stable > > > > > >> > >>>> assignments!) > > > > > >> > >>>>>>>> S13.v I'm finding the concept of group epoch here to be > > > > > >> somewhat > > > > > >> > >>>>>>>> overloaded. It sounds like it's trying to keep track of > > > > > >> multiple > > > > > >> > >>>> things > > > > > >> > >>>>>>>> within a single version: the group membership, the > > topology > > > > > >> > >>>> version, the > > > > > >> > >>>>>>>> assignment version, and various member statuses (eg > > rack > > > > > >> ID/client > > > > > >> > >>>>>> tags). > > > > > >> > >>>>>>>> Personally, I think it would be extremely valuable to > > > > unroll > > > > > >> all > > > > > >> > of > > > > > >> > >>>> this > > > > > >> > >>>>>>>> into separate epochs with clear definitions and > > boundaries > > > > and > > > > > >> > >>>> triggers. > > > > > >> > >>>>>>>> For example, I would suggest something like this: > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> group epoch: describes the group at a point in time, > > bumped > > > > > >> when > > > > > >> > >>>> set of > > > > > >> > >>>>>>>> member ids changes (ie static or dynamic member leaves > > or > > > > > >> joins) > > > > > >> > -- > > > > > >> > >>>>>>>> triggered by group coordinator noticing a change in > > group > > > > > >> > membership > > > > > >> > >>>>>>>> topology epoch: describes the topology version, bumped > > for > > > > each > > > > > >> > >>>>>> successful > > > > > >> > >>>>>>>> StreamsGroupInitialize that changes the broker's > > topology > > > > ID -- > > > > > >> > >>>>>> triggered > > > > > >> > >>>>>>>> by group coordinator bumping the topology ID (ie > > > > > >> > >>>> StreamsGroupInitialize) > > > > > >> > >>>>>>>> (optional) member epoch: describes the member version, > > > > bumped > > > > > >> > when a > > > > > >> > >>>>>>>> memberId changes its rack ID, client tags, or process > > ID > > > > (maybe > > > > > >> > >>>> topology > > > > > >> > >>>>>>>> ID, not sure) -- is this needed only for static > > membership? > > > > > >> Going > > > > > >> > >>>> back > > > > > >> > >>>>>> to > > > > > >> > >>>>>>>> question S12, when is the member epoch is bumped in the > > > > current > > > > > >> > >>>>>> proposal? > > > > > >> > >>>>>>>> assignment epoch: describes the assignment version, > > bumped > > > > when > > > > > >> > the > > > > > >> > >>>>>>>> assignor runs successfully (even if the actual > > assignment > > > > of > > > > > >> tasks > > > > > >> > >>>> does > > > > > >> > >>>>>> not > > > > > >> > >>>>>>>> change) -- can be triggered by tasks completing > > warmup, a > > > > > >> manual > > > > > >> > >>>>>>>> client-side trigger (future KIP only), etc (by > > definition > > > > is > > > > > >> > bumped > > > > > >> > >>>> any > > > > > >> > >>>>>>>> time any of the other epochs are bumped since they all > > > > trigger > > > > > >> a > > > > > >> > >>>>>>>> reassignment) > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S14. The KIP also mentions a new concept of an > > "assignment > > > > > >> epoch" > > > > > >> > >>>> but > > > > > >> > >>>>>>>> doesn't seem to ever elaborate on how this is defined > > and > > > > what > > > > > >> > it's > > > > > >> > >>>> used > > > > > >> > >>>>>>>> for. I assume it's bumped each time the assignment > > changes > > > > and > > > > > >> > >>>>>> represents a > > > > > >> > >>>>>>>> sort of "assignment version", is that right? Can you > > > > describe > > > > > >> in > > > > > >> > >>>> more > > > > > >> > >>>>>>>> detail the difference between the group epoch and the > > > > > >> assignment > > > > > >> > >>>> epoch? > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S15. I assume the "InstanceId" field in the HB request > > > > > >> corresponds > > > > > >> > >>>> to > > > > > >> > >>>>>> the > > > > > >> > >>>>>>>> group.instance.id config of static membership. I > > always > > > > felt > > > > > >> this > > > > > >> > >>>> field > > > > > >> > >>>>>>>> name was too generic and easy to get mixed up with > > other > > > > > >> > >>>> identifiers, > > > > > >> > >>>>>> WDYT > > > > > >> > >>>>>>>> about "StaticId" or "StaticInstanceId" to make the > > static > > > > > >> > membership > > > > > >> > >>>>>>>> relation more explicit? > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> S16. what is the "flexibleVersions" field in the RPC > > > > > >> protocols? I > > > > > >> > >>>> assume > > > > > >> > >>>>>>>> this is related to versioning compatibility but it's > > not > > > > quite > > > > > >> > clear > > > > > >> > >>>>>> from > > > > > >> > >>>>>>>> the KIP how this is to be used. For example if we > > modify > > > > the > > > > > >> > >>>> protocol by > > > > > >> > >>>>>>>> adding new fields at the end, we'd bump the version but > > > > should > > > > > >> the > > > > > >> > >>>>>>>> flexibleVersions field change as well? > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy > > > > > >> > >>>>>>>> <lbruts...@confluent.io.invalid> wrote: > > > > > >> > >>>>>>>> > > > > > >> > >>>>>>>>> Hi Sophie, > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> Thanks for your detailed comments - much appreciated! > > I > > > > think > > > > > >> you > > > > > >> > >>>> read > > > > > >> > >>>>>>>>> a version of the KIP that did not yet include the > > admin > > > > > >> > >>>> command-line > > > > > >> > >>>>>>>>> tool and the Admin API extensions, so some of the > > > > comments are > > > > > >> > >>>> already > > > > > >> > >>>>>>>>> addressed in the KIP. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize > > are > > > > > >> called > > > > > >> > in > > > > > >> > >>>> the > > > > > >> > >>>>>>>>> consumer background thread. Note that in the new > > consumer > > > > > >> > threading > > > > > >> > >>>>>>>>> model, all RPCs are run by the background thread. > > Check > > > > out > > > > > >> this: > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>> > > > > > >> > >>>> > > > > > >> > > > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > > > > > >> > >>>>>>>>> for more information. Both our RPCs are just part of > > the > > > > group > > > > > >> > >>>>>>>>> membership management and do not need to be invoked > > > > > >> explicitly by > > > > > >> > >>>> the > > > > > >> > >>>>>>>>> application thread. You could imagine calling the > > > > initialize > > > > > >> RPC > > > > > >> > >>>>>>>>> explicitly (to implement explicit initialization), but > > > > this > > > > > >> would > > > > > >> > >>>>>>>>> still mean sending an event to the background thread, > > and > > > > the > > > > > >> > >>>>>>>>> background thread in turn invokes the RPC. However, > > > > explicit > > > > > >> > >>>>>>>>> initialization would require some additional public > > > > interfaces > > > > > >> > >>>> that we > > > > > >> > >>>>>>>>> are not including in this KIP. StreamsGroupDescribe is > > > > called > > > > > >> by > > > > > >> > >>>> the > > > > > >> > >>>>>>>>> AdminClient, and used by the command-line tool > > > > > >> > >>>>>>>>> kafka-streams-groups.sh. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S2. I think the max.warmup.replicas=100 suggested by > > Nick > > > > was > > > > > >> > >>>> intended > > > > > >> > >>>>>>>>> as the upper limit for setting the group > > configuration on > > > > the > > > > > >> > >>>> broker. > > > > > >> > >>>>>>>>> Just to make sure that this was not a > > misunderstanding. By > > > > > >> > default, > > > > > >> > >>>>>>>>> values above 100 should be rejected when setting a > > > > specific > > > > > >> value > > > > > >> > >>>> for > > > > > >> > >>>>>>>>> group. Are you suggesting 20 or 30 for the default > > value > > > > for > > > > > >> > >>>> groups, > > > > > >> > >>>>>>>>> or the default upper limit for the group > > configuration? > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S3. Yes, it's supposed to be used like > > > > SHUTDOWN_APPLICATION. > > > > > >> The > > > > > >> > >>>>>>>>> MemberEpoch=-1 is a left-over from an earlier > > discussion. > > > > It > > > > > >> > means > > > > > >> > >>>>>>>>> that the member is leaving the group, so the > > intention was > > > > > >> that > > > > > >> > the > > > > > >> > >>>>>>>>> member must leave the group when it asks the other > > > > members to > > > > > >> > shut > > > > > >> > >>>>>>>>> down. We later reconsidered this and decided that all > > > > > >> > applications > > > > > >> > >>>>>>>>> should just react to the shutdown application signal > > that > > > > is > > > > > >> > >>>> returned > > > > > >> > >>>>>>>>> by the broker, so the client first sets the > > > > > >> ShutdownApplication > > > > > >> > and > > > > > >> > >>>>>>>>> later leaves the group. Thanks for spotting this, I > > > > removed > > > > > >> it. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S4. Not sure if this refers to the latest version of > > the > > > > KIP. > > > > > >> We > > > > > >> > >>>> added > > > > > >> > >>>>>>>>> an extension of the admin API and a > > > > kafka-streams-groups.sh > > > > > >> > >>>>>>>>> command-line tool to the KIP already. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S5. All RPCs for dealing with offsets will keep > > working > > > > with > > > > > >> > >>>> streams > > > > > >> > >>>>>>>>> groups. The extension of the admin API is rather > > cosmetic, > > > > > >> since > > > > > >> > >>>> the > > > > > >> > >>>>>>>>> method names use "consumer group". The RPCs, however, > > are > > > > > >> generic > > > > > >> > >>>> and > > > > > >> > >>>>>>>>> do not need to be changed. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S6. Yes, you can use the DeleteGroup RPC with any > > group > > > > ID, > > > > > >> > whether > > > > > >> > >>>>>>>>> streams group or not. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S7. See the admin API section. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S8. I guess for both A and B, I am not sure what you > > are > > > > > >> > >>>> suggesting. > > > > > >> > >>>>>>>>> Do you want to make the broker-side topology immutable > > > > and not > > > > > >> > >>>> include > > > > > >> > >>>>>>>>> any information about the topology, like the topology > > ID > > > > in > > > > > >> the > > > > > >> > >>>> RPC? > > > > > >> > >>>>>>>>> It would seem that this would be a massive food-gun > > for > > > > > >> people, > > > > > >> > if > > > > > >> > >>>>>>>>> they start changing their topology and don't notice > > that > > > > the > > > > > >> > >>>> broker is > > > > > >> > >>>>>>>>> looking at a completely different version of the > > > > topology. Or > > > > > >> did > > > > > >> > >>>> you > > > > > >> > >>>>>>>>> mean that there is some kind of topology ID, so that > > at > > > > least > > > > > >> we > > > > > >> > >>>> can > > > > > >> > >>>>>>>>> detect inconsistencies between broker and client-side > > > > > >> topologies, > > > > > >> > >>>> and > > > > > >> > >>>>>>>>> we fence out any member with an incorrect topology ID? > > > > Then we > > > > > >> > >>>> seem to > > > > > >> > >>>>>>>>> end up with mostly the same RPCs and the same > > questions > > > > (how > > > > > >> is > > > > > >> > the > > > > > >> > >>>>>>>>> topology ID generated?). I agree that the latter > > could be > > > > an > > > > > >> > >>>> option. > > > > > >> > >>>>>>>>> See summary at the end of this message. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S9. If we flat out refuse topology updates, agree - we > > > > cannot > > > > > >> let > > > > > >> > >>>> the > > > > > >> > >>>>>>>>> application crash on minor changes to the topology. > > > > However, > > > > > >> if > > > > > >> > we > > > > > >> > >>>>>>>>> allow topology updates as described in the KIP, there > > are > > > > only > > > > > >> > >>>> upsides > > > > > >> > >>>>>>>>> to making the topology ID more sensitive. All it will > > > > cause is > > > > > >> > >>>> that a > > > > > >> > >>>>>>>>> client will have to resend a > > `StreamsGroupInitialize`, and > > > > > >> during > > > > > >> > >>>> the > > > > > >> > >>>>>>>>> rolling bounce, older clients will not get tasks > > assigned. > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> S10. The intention in the KIP is really just this - > > old > > > > > >> clients > > > > > >> > can > > > > > >> > >>>>>>>>> only retain tasks, but not get new ones. If the > > topology > > > > has > > > > > >> > >>>>>>>>> sub-topologies, these will initially be assigned to > > the > > > > new > > > > > >> > >>>> clients, > > > > > >> > >>>>>>>>> which also have the new topology. You are right that > > > > rolling > > > > > >> an > > > > > >> > >>>>>>>>> application where the old structure and the new > > structure > > > > are > > > > > >> > >>>>>>>>> incompatible (e.g. different subtopologies access the > > same > > > > > >> > >>>> partition) > > > > > >> > >>>>>>>>> will cause problems. But this will also cause > > problems in > > > > the > > > > > >> > >>>> current > > > > > >> > >>>>>>>>> protocol, so I'm not sure, if it's strictly a > > regression, > > > > it's > > > > > >> > just > > > > > >> > >>>>>>>>> unsupported (which we can only make the best effort to > > > > > >> detect). > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> In summary, for the topology ID discussion, I mainly > > see > > > > two > > > > > >> > >>>> options: > > > > > >> > >>>>>>>>> 1) stick with the current KIP proposal > > > > > >> > >>>>>>>>> 2) define the topology ID as the hash of the > > > > > >> > StreamsGroupInitialize > > > > > >> > >>>>>>>>> topology metadata only, as you suggested, and fence > > out > > > > > >> members > > > > > >> > >>>> with > > > > > >> > >>>>>>>>> an incompatible topology ID. > > > > > >> > >>>>>>>>> I think doing 2 is also a good option, but in the end > > it > > > > comes > > > > > >> > >>>> down to > > > > > >> > >>>>>>>>> how important we believe topology updates (where the > > set > > > > of > > > > > >> > >>>>>>>>> sub-topologies or set of internal topics are > > affected) to > > > > be. > > > > > >> The > > > > > >> > >>>> main > > > > > >> > >>>>>>>>> goal is to make the new protocol a drop-in replacement > > > > for the > > > > > >> > old > > > > > >> > >>>>>>>>> protocol, and at least define the RPCs in a way that > > we > > > > won't > > > > > >> > need > > > > > >> > >>>>>>>>> another KIP which bumps the RPC version to make the > > > > protocol > > > > > >> > >>>>>>>>> production-ready. Adding more command-line tools, or > > > > public > > > > > >> > >>>> interfaces > > > > > >> > >>>>>>>>> seems less critical as it doesn't change the protocol > > and > > > > can > > > > > >> be > > > > > >> > >>>> done > > > > > >> > >>>>>>>>> easily later on. The main question becomes - is the > > > > protocol > > > > > >> > >>>>>>>>> production-ready and sufficient to replace the classic > > > > > >> protocol > > > > > >> > if > > > > > >> > >>>> we > > > > > >> > >>>>>>>>> go with 2? > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>>>>> Cheers, > > > > > >> > >>>>>>>>> Lucas > > > > > >> > >>>>>>>>> > > > > > >> > >>>>>> > > > > > >> > >>>> > > > > > >> > >>>> > > > > > >> > >>> > > > > > >> > >> > > > > > >> > > > > > > >> > > > > > > >> > > > > > > > > > > > > > >