Hi Andrew,

thank you!

That was a bit sloppy, AS18/19 addressed.

Cheers,
Lucas

On Fri, Dec 6, 2024 at 10:14 AM Andrew Schofield
<andrew_schofield_j...@outlook.com> wrote:
>
> Hi Lucas,
> Thanks for the responses. I expect these are my final comments before
> voting.
>
> AS18: In StreamsGroupDescription, the method with signature
> `public StreamsGroupState state()` should be replaced by
> `public GroupState groupState()` for consistency with
> ConsumerGroupDescription and ShareGroupDescription.
>
> AS19: The text for GroupState has a copy-paste error and
> names GroupType instead.
>
> Thanks,
> Andrew
> ________________________________________
> From: Lucas Brutschy <lbruts...@confluent.io.INVALID>
> Sent: 04 December 2024 16:08
> To: dev@kafka.apache.org <dev@kafka.apache.org>
> Subject: Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
>
> Hi Andrew,
>
> thanks for the comments!
>
> You are right, we haven't fully caught up with the changed that were
> made in the final version of KIP-1043. I agree with all your points,
> and have made the corresponding changes in the KIP.
>
> Cheers,
> Lucas
>
> On Tue, Dec 3, 2024 at 6:51 PM Andrew Schofield
> <andrew_schofield_j...@outlook.com> wrote:
> >
> > Hi Bruno/Lucas,
> > A few comments related to consistency with the other KIPs.
> > As we now have multiple types of group, we need to work
> > out how to handle listing, describing and so on in a consistent way.
> >
> > AS14: KIP-1043 introduced Admin#listGroups and this is intended as the
> > way to list groups of all types. As a result, the specific methods and
> > classes which applied to just one type of group have been removed.
> >
> > Admin#listShareGroups does not exist (any more) and instead
> > Admin#listGroups with a filter on group type can be used instead.
> > There is no ShareGroupListing, ListShareGroupsOptions or
> > ListShareGroupsResult either.
> >
> > All of the equivalent stuff for consumer groups are being deprecated
> > and removed by KIP-1043 (probably in AK 4.1).
> >
> > I suggest that you remove Admin#listStreamsGroups also. It is not
> > necessary because Admin#listGroups can do it.
> >
> > AS15: Similarly, there is no ShareGroupState any longer, just
> > GroupState. A streams group has a specific state, NOT_READY.
> > I recommend that you add this state to GroupState, and
> > use GroupState instead of having StreamsGroupState.
> >
> > During the development of KIP-1043, it became clear that
> > having enumerations for all of the group types separately
> > was cumbersome and it prevented a state-based filter on
> > Admin#listGroups.
> >
> > AS16: The o.a,k.common.GroupType for streams should be STREAMS
> > not Streams.
> >
> > AS17: Please add streams groups to bin/kafka-groups.sh.
> >
> > Thanks,
> > Andrew
> > ________________________________________
> > From: Sophie Blee-Goldman <sop...@responsive.dev>
> > Sent: 20 November 2024 08:15
> > To: dev@kafka.apache.org <dev@kafka.apache.org>
> > Subject: Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
> >
> > Thanks Lucas! That all sounds good to me.
> >
> > I think I officially have nothing left to say or ask about this KIP, so
> > once you've
> > updated the doc with what we've discussed in the past few messages then
> > I'm personally feeling ready to vote on it.
> >
> > On Tue, Nov 19, 2024 at 11:10 PM Lucas Brutschy
> > <lbruts...@confluent.io.invalid> wrote:
> >
> > > Hi Sophie,
> > >
> > > S1. Yes, we'll include a list of missing topics or topic
> > > misconfigurations in the heartbeat response (StatusDetail). Of course,
> > > we will not only expose this via the streams group state metric which
> > > can be monitored, we will log the state of the group and the status
> > > detail on the client, so it should be pretty straight forward for new
> > > users to figure out the problem.
> > >
> > > As for the streams group state listener - the streams group state is
> > > sent with the heartbeat response, so it's not directly possible to
> > > implement it. Maybe it would make more sense to define the listener on
> > > the `status` code sent in the heartbeat response, which includes
> > > members-specific states (like STALE_TOPOLOGY). But yes, let's do this
> > > in a separate KIP. I'll create a ticket for it (once we have accepted
> > > this KIP and start creating tickets for KIP-1071).
> > >
> > > S2. Yes, the config `topology.epoch` would go to StreamsConfig.
> > > StreamsGroupDescribe RPC would return the current epoch. As Bruno
> > > suggested, the default of the `topology.epoch` would be 0 on the
> > > client and the epoch would start at -1 on the broker (or 1 and 0, if
> > > people prefer). So to first initialize a group, no config change would
> > > be required. People who reset their application or change the
> > > application ID upon redeploys would never have to touch the config.
> > >
> > > I think we don't need the topology ID to validate that the user did
> > > not change the topology without bumping the epoch. We keep the
> > > topology and topology epoch immutable during a client's lifetime, so
> > > it's enough that we validate this once when joining. When the client
> > > joins, we can check whether the topology_epoch & topology match the
> > > broker-side. All following heartbeats don't include the topology
> > > anymore, but we do not need to "reverify" that the topology epoch &
> > > topology are consistent, it is enough to compare epochs.
> > >
> > > Cheers,
> > > Lucas
> > >
> > > On Wed, Nov 20, 2024 at 4:56 AM Sophie Blee-Goldman
> > > <sop...@responsive.dev> wrote:
> > > >
> > > > Thanks Lucas! A few more questions to nail down the details but on the
> > > > whole I think this is a good plan
> > > >
> > > > S1. I definitely agree that the current behavior is inconsistent and not
> > > > ideal for missing topics. So I'm fine with changing this, just wanted to
> > > > make sure it was intentional. That said I do think it would be a good
> > > > exercise to think through the user experience here and what it will look
> > > > like in practice for someone to debug why their app is not doing 
> > > > anything
> > > > due to missing source topics. Especially because in my experience this
> > > > tends to happen most with relatively new users who are just trying out
> > > > Kafka Streams eg via the quickstart, or running and debugging in their
> > > test
> > > > environment, etc. Asking them to monitor the group state might be a lot
> > > for
> > > > some folks, especially if that has to be done via a separate cli they
> > > > probably don't even know about.  But I do think you're right that on the
> > > > whole, the application should stay up and running and not immediately 
> > > > die
> > > > if topics are missing. It sounds like we do plan to log a warning when
> > > this
> > > > happens as detected by the heartbeat, which helps. Would be even better
> > > if
> > > > we could return a list of the missing topic names in the heartbeat and
> > > > include that in the logs.
> > > >
> > > > Beyond that, what if we introduced a new kind of listener similar to the
> > > > KafkaStreams.State listener, but instead of listening in on the state of
> > > > the client it listens on the state of the Streams group? This could be
> > > done
> > > > in a followup KIP of course, just pointing out that we can improve on
> > > this
> > > > in the future without modifying anything in the current KIP. I'd prefer
> > > > this over introducing a timeout for missing topics,, personally. (If you
> > > > don't want to include it in the current KIP let's just file a ticket for
> > > it
> > > > perhaps?)
> > > >
> > > > S2. Ok just to hammer out the details, we're proposing that the topology
> > > > epoch be configurable via a new StreamsConfig, is that right? And that
> > > > there will be an Admin API that can be used to fetch the current 
> > > > topology
> > > > epoch for the group?
> > > >
> > > > How will this work if a user doesn't set the topology.epoch config? Do 
> > > > we
> > > > just initialize it to 0 and assume that the topology will always be the
> > > > same until/unless the user manually bumps it by the StreamsConfig? Would
> > > we
> > > > maybe want to keep the topology ID around so that we can verify the
> > > > topology ID remains the same and matches for a given topology epoch? Or
> > > > should we just leave it up to users to not screw with their topology
> > > > without bumping the topology epoch? To be fair, that's more or less what
> > > we
> > > > do now, but it feels like if we introduce a concept of topology version
> > > > then we should try to enforce it somehow. I'm not sure, what are your
> > > > thoughts here?
> > > >
> > > > Or does each client send the entire topology in each heartbeat? That 
> > > > kind
> > > > of seems like a lot of data to be passing around that often. We could
> > > > replace the member.topology with member.topology_ID for purposes of
> > > > verification (eg detecting the STREAMS_INVALID_TOPOLOGY_EPOCH case) and
> > > get
> > > > verification on every heartbeat without all the extra bytes. Probably 
> > > > not
> > > > an issue most of the time but people can run some truly huge topologies.
> > > >
> > > > On Tue, Nov 19, 2024 at 2:47 AM Lucas Brutschy
> > > > <lbruts...@confluent.io.invalid> wrote:
> > > >
> > > > > Hi Sophie,
> > > > >
> > > > > S1. You are reading it correctly. We added NOT_READY to bridge the
> > > > > time when internal topics are being created. Right now, the
> > > > > StreamsPartitioner just blocks until the internal topics are created,
> > > > > but we don't want to block in the broker. We want the client to be
> > > > > able to join while the internal topics may not be created, yet.
> > > > >
> > > > > We then went on to simplify the story about topic / topology
> > > > > mismatches to treat all such errors the same way - we enter NOT_READY,
> > > > > but keep the application running, and indicate the problem in the
> > > > > heartbeats. The idea is that the user just monitors the status of the
> > > > > group, which is exposed as a metric.
> > > > >
> > > > > We can try to make the behavior more like the StreamsPartitioner, but
> > > > > its behavior is quite inconsistent. For example, missing source topics
> > > > > are handled via an error code which shut down the application, but
> > > > > problems when creating the internal topics, or topics not being
> > > > > copartitioned, we just let TopologyException/TimeoutException fall
> > > > > through (which I believe fails the group leader only).
> > > > >
> > > > > We could consider shutting down the application upon missing /
> > > > > mispartitioned internal or source topics. I wonder if the simplest way
> > > > > to do that would be to introduce a timeout on the client side. I think
> > > > > the internal topic manager right now uses something like
> > > > > max.poll.interval*2. But I'm not sure if shutting down the application
> > > > > really simplifies things.
> > > > >
> > > > > S2. Let me try to add some details to the Bruno's proposal. Manually,
> > > > > to upgrade a topology, the user would just deploy a new version of the
> > > > > application, which has the new topology and a configuration change (in
> > > > > the code or wherever) and deploy it. If the user doesn't know the
> > > > > current topology epoch, they could fetch it using the streams groups
> > > > > command-line tool, or potentially a UI. Automating in CD would work
> > > > > the same way - determine the current topology epoch using admin API or
> > > > > the command-line tool, set the config (by changing a file, or
> > > > > launchdarkly or whatever your setup is) and deploy. It's definitely
> > > > > not completely trivial, but it is an advanced feature, after all. I
> > > > > made a rough sketch of error handling for topology epochs, I'll put
> > > > > this in the "appendix" below since it's quite detailed.
> > > > >
> > > > > S3. If we are going for topology epochs, which is a nice proposal, I
> > > > > am not sure if we actually need to have the topology ID anymore -
> > > > > since we can store the topology epoch that each member is currently
> > > > > at, and we cannot change the topology without bumping the epoch, the
> > > > > point of the ID (detecting that a member is working with a stale
> > > > > topology) can be replaced by comparing topology epochs. So if we go
> > > > > down this route, I’d suggest removing topology IDs. We could introduce
> > > > > them for naming purposes, but they are not required by the protocol,
> > > > > so I'd rather do that later, if ever.
> > > > >
> > > > > Cheers, Lucas
> > > > >
> > > > > ## Appendix: error handling for topology epochs.
> > > > >
> > > > > We’ll require some new/changed error codes / statuses to handle the
> > > > > updates correctly.
> > > > >
> > > > >  * Non-fatal status INCONSISTENT_TOPOLOGY is renamed to
> > > > > STALE_TOPOLOGY, since we now have something like a topology
> > > > > chronology. It is returned for members who are already part of the
> > > > > group, but at an old topology epoch
> > > > >  * Fatal error STREAMS_INVALID_TOPOLOGY_EPOCH if, for example, the
> > > > > topology was changed by the epoch not bumped.
> > > > >  * Fatal error STREAMS_TOPOLOGY_FENCED if we are trying to join with
> > > > > an outdated topology.
> > > > >
> > > > > So then all possible cases should be handled like this:
> > > > >
> > > > >  * When a member joins (heartbeat.member_epoch = 0)
> > > > >  ** Require both heartbeat.topology and heartbeat.topology_epoch to be
> > > set.
> > > > >  ** If heartbeat.topology_epoch = group.topology_epoch + 1, the member
> > > > > joins and updates topology ID/metadata/epoch for the group.
> > > > >  ** If heartbeat.topology_epoch = group.topology_epoch and
> > > > > heartbeat.topology = group.topology, the member joins without updating
> > > > > the topology ID/metadata/epoch for the group.
> > > > >  ** If heartbeat.topology_epoch = group.topology_epoch but
> > > > > heartbeat.topology != group.topology, we will fail with error
> > > > > STREAMS_INVALID_TOPOLOGY_EPOCH.
> > > > >  ** If heartbeat.topology_epoch > group.topology_epoch + 1, we fail
> > > > > with new error STREAMS_INVALID_TOPOLOGY_EPOCH
> > > > >  ** If heartbeat.topology_epoch < group.topology_epoch, we fail with
> > > > > STREAMS_TOPOLOGY_FENCED
> > > > > * Otherwise (if heartbeat.member_epoch != 0)
> > > > > ** Require that none of heartbeat.topology, heartbeat.topology_epoch
> > > > > are set in the request, and let member.topology_epoch be the topology
> > > > > epoch stored in the member metadata on the broker for that member.
> > > > >  * If member.topology_epoch < group.topology_epoch, the heartbeat is
> > > > > successful, but we set the state STALE_TOPOLOGY.
> > > > >  * If member.topology_epoch > group.topology_epoch, should be
> > > impossible.
> > > > >  * If member.topology_epoch = group.topology_epoch, the heartbeat is
> > > > > successful.
> > > > >
> > > > > Common cases that could happen in practice (each of them should have a
> > > > > different status / error code to be distinguishable):
> > > > >
> > > > >  * When a user rolls the application with a new topology with a
> > > > > different topology, but without bumping the topology epoch, the roll
> > > > > of the first client would directly fail with
> > > > > STREAMS_INVALID_TOPOLOGY_EPOCH, which would be treated as fatal and
> > > > > cause the first client to not come up - which cause the user to
> > > > > correctly detect the error.
> > > > >  * When a user rolls the application with a new topology epoch /
> > > topology
> > > > >  ** The first client being rolled will bump the topology epoch
> > > > >  ** All other clients will see that they are using an outdated
> > > > > topology by receiving a non-fatal STALE_TOPOLOGY error
> > > > >  ** If any of the stale clients have to rejoin the group before being
> > > > > updated to the new topology, they will fail fatally with
> > > > > STREAMS_TOPOLOGY_FENCED.
> > > > >
> > > > >
> > > > > On Mon, Nov 18, 2024 at 12:53 PM Nick Telford <nick.telf...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > Actually, scratch that. On reflection I think I prefer Bruno's
> > > original
> > > > > > idea to specify it in the configuration.
> > > > > >
> > > > > > Cheers,
> > > > > > Nick
> > > > > >
> > > > > > On Sat, 16 Nov 2024 at 17:59, Nick Telford <nick.telf...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hey everyone,
> > > > > > >
> > > > > > > With respect to Bruno's proposal, could instances cache their
> > > topology
> > > > > > > epoch on disk, and then upgrades/downgrades would simply involve
> > > > > deleting
> > > > > > > the cached epoch before starting the instance?
> > > > > > >
> > > > > > > My thinking is that this might be potentially simpler for users
> > > than
> > > > > > > modifying configuration.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Nick
> > > > > > >
> > > > > > > On Sat, 16 Nov 2024, 00:41 Sophie Blee-Goldman, <
> > > sop...@responsive.dev
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Thanks for the updates! Few minor questions before we wrap this
> > > up and
> > > > > > >> move
> > > > > > >> to a vote:
> > > > > > >>
> > > > > > >> S1. Can you clarify the outward-facing behavior of a group that
> > > > > enters the
> > > > > > >> NOT_READY state due to, say, missing source topics? It sounds
> > > like the
> > > > > > >> application will continue to run without processing anything,
> > > which
> > > > > would
> > > > > > >> be a change compared to today where we shut down with an explicit
> > > > > error.
> > > > > > >> Am
> > > > > > >> I reading this correctly and is this change intentional?
> > > > > > >>
> > > > > > >> S2. Regarding topology upgrades, I am fully in favor of Bruno's
> > > > > proposal.
> > > > > > >> Just feel like we need to flesh it out a bit. For example you
> > > say  "no
> > > > > > >> specific tooling" is needed, but how would a user go about
> > > upgrading
> > > > > the
> > > > > > >> topology? If we can just describe this process in a bit more
> > > detail
> > > > > then I
> > > > > > >> would feel good.
> > > > > > >>
> > > > > > >> S3. Lastly, just a quick reminder, we should make sure to clarify
> > > in
> > > > > the
> > > > > > >> KIP how the topology ID will be computed since that impacts which
> > > > > kind of
> > > > > > >> upgrades are possible and is therefore part of the public API.
> > > IIUC
> > > > > the
> > > > > > >> plan is to compute it from the topics which makes sense to me
> > > (though
> > > > > I do
> > > > > > >> think it should eventually be pluggable, perhaps in a followup
> > > KIP)
> > > > > > >>
> > > > > > >> On Thu, Nov 7, 2024 at 10:08 AM Bruno Cadonna <cado...@apache.org
> > > >
> > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Lucas and all others,
> > > > > > >> >
> > > > > > >> > Thanks for the proposals regarding the topology upgrades!
> > > > > > >> >
> > > > > > >> > I have an additional proposal:
> > > > > > >> >
> > > > > > >> > We introduce a topology epoch alongside the topology ID. The
> > > > > topology
> > > > > > >> > epoch is set to 0 on the group coordinator for a new group. The
> > > > > topology
> > > > > > >> > epoch is also a config within the Streams client, the default
> > > is 1.
> > > > > > >> >
> > > > > > >> > When the Streams client starts up, it sends its topology epoch
> > > > > alongside
> > > > > > >> > the topology ID and the topology to the group coordinator. The
> > > group
> > > > > > >> > coordinator initializes the topology with the given topology ID
> > > and
> > > > > > >> > increases the topology epoch to 1.
> > > > > > >> >
> > > > > > >> > When the client wants to upgrade a topology it needs to send a
> > > valid
> > > > > > >> > topology epoch alongside the new topology ID and the topology.
> > > We
> > > > > can
> > > > > > >> > decide if the new topology epoch = current topology at group
> > > > > coordinator
> > > > > > >> > + 1 or just new topology epoch > current topology on group
> > > > > coordinator.
> > > > > > >> >
> > > > > > >> > When the group coordinator receives a topology ID that differs
> > > from
> > > > > the
> > > > > > >> > one intialized for the group, the group coordinator only
> > > upgrades
> > > > > to the
> > > > > > >> > new topology if the topology epoch is valid. Then, it sets the
> > > > > topology
> > > > > > >> > epoch to the new value. If the topology epoch is not valid, 
> > > > > > >> > that
> > > > > means
> > > > > > >> > the topology ID is an old one and the group coordinator does 
> > > > > > >> > not
> > > > > > >> > re-initialize the topology.
> > > > > > >> >
> > > > > > >> > The current topology epoch can be retrieved with the describe
> > > RPC.
> > > > > > >> >
> > > > > > >> > Accidental rollbacks of topologies are not be possible. Only
> > > wanted
> > > > > > >> > rollbacks of topologies with an appropriate topology epoch are
> > > > > allowed.
> > > > > > >> >
> > > > > > >> > Specific tooling is not needed.
> > > > > > >> >
> > > > > > >> > The topology upgrade can be automated in CI in the following
> > > way:
> > > > > > >> >
> > > > > > >> > 1. Call streams group describe to determine the current epoch.
> > > > > > >> >
> > > > > > >> > 2. Set the Streams client config topology.epoch = current epoch
> > > + 1
> > > > > > >> >
> > > > > > >> > 3. Roll the application.
> > > > > > >> >
> > > > > > >> > WDYT?
> > > > > > >> >
> > > > > > >> > Best,
> > > > > > >> > Bruno
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On 07.11.24 18:26, Lucas Brutschy wrote:
> > > > > > >> > > Hi all,
> > > > > > >> > >
> > > > > > >> > > I have updated the KIP with some details around how we handle
> > > the
> > > > > > >> > > cases when essential topics required by the topology are not
> > > > > present.
> > > > > > >> > > This is described in a new section "Handling topic topology
> > > > > > >> > > mismatches". The short summary is that we enter a state where
> > > the
> > > > > > >> > > group member's heartbeats will succeed, but no assignments
> > > will be
> > > > > > >> > > made, and the nature of the topic misconfiguration is
> > > > > communicated to
> > > > > > >> > > the members using the `status` field of the heartbeat
> > > response.
> > > > > > >> > >
> > > > > > >> > > Working towards putting up this KIP up for a vote, there was
> > > one
> > > > > last
> > > > > > >> > > concern mentioned during our last sync, which was a concern
> > > > > raised by
> > > > > > >> > > Sophie and others around topology updating. I would like to
> > > > > propose
> > > > > > >> > > the following changes to the KIP to solidify the story around
> > > > > topology
> > > > > > >> > > updating.
> > > > > > >> > >
> > > > > > >> > > To recap, a member sends its topology metadata every time it
> > > > > attempts
> > > > > > >> > > to join a streams group. When the group already has a 
> > > > > > >> > > topology
> > > > > set,
> > > > > > >> > > the KIP proposes to update the topology for the group
> > > immediately
> > > > > with
> > > > > > >> > > that first heartbeat of a member. All other members will then
> > > > > receive
> > > > > > >> > > a non-fatal group status INCONSISTENT_TOPOLOGY, letting them
> > > know
> > > > > that
> > > > > > >> > > their topology is not equal to the groups topology anymore.
> > > Those
> > > > > > >> > > members with an outdated topology will be able to retain 
> > > > > > >> > > their
> > > > > current
> > > > > > >> > > tasks, but will not receive new tasks, until they have also
> > > > > updated
> > > > > > >> > > their topology. This enables a rolling bounce to upgrade the
> > > > > topology.
> > > > > > >> > >
> > > > > > >> > > There is a concern with this behavior, raised both in our
> > > > > discussions
> > > > > > >> > > but also by Guozhang and Sophie. The problem is that a
> > > > > “misbehaving”
> > > > > > >> > > member can always roll back a topology upgrade, if it is 
> > > > > > >> > > still
> > > > > running
> > > > > > >> > > with the old version of the topology. For example, if we roll
> > > 20
> > > > > > >> > > members to a new topology version, and we have successfully
> > > > > restarted
> > > > > > >> > > 19 out of 20, but the last member is restarted with the old
> > > > > topology,
> > > > > > >> > > this will cause the topology version on the broker to be
> > > rolled
> > > > > back.
> > > > > > >> > > This could happen, for example, because topology update fails
> > > on
> > > > > the
> > > > > > >> > > last member, or the last member runs into a fatal error of
> > > some
> > > > > kind
> > > > > > >> > > (out-of-memory or being fenced, for example). Once the failed
> > > > > member
> > > > > > >> > > rejoins, it updates the topology on the broker to the “old”
> > > > > topology,
> > > > > > >> > > so performs an involuntary roll-back. Now, we would be in a
> > > > > situation
> > > > > > >> > > where 19 out of 20 members are blocked from getting new tasks
> > > > > > >> > > assigned. We would be in this state just temporary, until the
> > > > > final,
> > > > > > >> > > misbehaving member is restarted correctly with the new
> > > topology.
> > > > > > >> > >
> > > > > > >> > > The original suggestion in the KIP discussion to solve these
> > > > > issues
> > > > > > >> > > (for now) was to exclude topology updates from the KIP.
> > > However,
> > > > > it
> > > > > > >> > > was confirmed by several people (such as Anna) that topology
> > > > > updates
> > > > > > >> > > are quite common in the wild, so for having a 
> > > > > > >> > > production-ready
> > > > > > >> > > implementation of the new protocol, a mechanism for topology
> > > > > updates
> > > > > > >> > > is required. It is not clear how strict the requirement for
> > > > > rolling
> > > > > > >> > > updates in the first version of the protocol. No ‘better’
> > > > > alternative
> > > > > > >> > > for updating topologies was suggested.
> > > > > > >> > >
> > > > > > >> > > Proposal 1) Making topology updating optional: We modify the
> > > > > heartbeat
> > > > > > >> > > request to allow joining a group without sending a topology
> > > (only
> > > > > a
> > > > > > >> > > topology ID). When joining a group this way, the member is
> > > > > explicitly
> > > > > > >> > > requesting to not update the topology of the group. In other
> > > > > words,
> > > > > > >> > > leaving the topology as null, will mean that no topology
> > > update
> > > > > will
> > > > > > >> > > be performed on the broker side. If the topology ID does not
> > > > > match the
> > > > > > >> > > topology ID of the group, and no topology update is 
> > > > > > >> > > performed,
> > > > > and the
> > > > > > >> > > member that joined will just behave like any other member
> > > with an
> > > > > > >> > > inconsistent topology ID. That is, it will not get any tasks
> > > > > assigned.
> > > > > > >> > > Streams clients will never send the topology when they know
> > > they
> > > > > are
> > > > > > >> > > just rejoining the group, e.g. because the client was fenced
> > > and
> > > > > > >> > > dropped out of the group. That means, a topology update is
> > > only
> > > > > > >> > > initiated when a streams application is first started up. 
> > > > > > >> > > This
> > > > > will
> > > > > > >> > > prevent "accidental" topology downgrades if we know that we
> > > are
> > > > > > >> > > rejoining the group. However, if we rejoin the group without
> > > > > knowing
> > > > > > >> > > that we are just rejoining (e.g. since the whole pod was
> > > > > restarted),
> > > > > > >> > > we could still run into temporary topology downgrades.
> > > > > > >> > >
> > > > > > >> > > Proposal 2) Disable topology updates by default on the 
> > > > > > >> > > client:
> > > > > > >> > > Second, we can disable topology updates by default from the
> > > client
> > > > > > >> > > side, by extending the streams configuration with a config
> > > > > > >> > > enable_topology_updates which defaults to false and extending
> > > the
> > > > > > >> > > heartbeat RPC by a flag UpdateTopology which propagates that
> > > > > config
> > > > > > >> > > value to the broker. The advantage of having a client-side
> > > config
> > > > > is,
> > > > > > >> > > that, during a roll, I can set the configuration only for
> > > updated
> > > > > > >> > > clients, which can ensure that no accidental rollbacks
> > > happen. The
> > > > > > >> > > downside is that after the roll, I would have to do a second
> > > roll
> > > > > to
> > > > > > >> > > disable enable_topology_updates again, so that we are safe
> > > during
> > > > > the
> > > > > > >> > > next roll again. However, avoiding the second roll is more of
> > > a
> > > > > > >> > > usability optimization, which is conceptually more complex 
> > > > > > >> > > and
> > > > > could
> > > > > > >> > > address in a follow-up KIP, as in proposal 3.
> > > > > > >> > >
> > > > > > >> > > Proposal 3) Fixing the topology ID during upgrades (follow-up
> > > > > KIP): To
> > > > > > >> > > fully prevent accidental downgrades, one could introduce a
> > > > > broker-side
> > > > > > >> > > group-level config that restricts the possible topology IDs
> > > that a
> > > > > > >> > > client can upgrade to. This would be quite similar in safety
> > > as
> > > > > making
> > > > > > >> > > topology updates manual. For updating a topology, the user
> > > would
> > > > > have
> > > > > > >> > > to first, extract the topology ID from the update topology -
> > > this
> > > > > > >> > > could e.g. be logged when the application is started, second,
> > > set
> > > > > the
> > > > > > >> > > group-level config using an admin client (something like
> > > > > > >> > > group.streams.allowed_upgrade_topology_id), third, roll the
> > > > > > >> > > application. This would require quite some extra tooling (how
> > > to
> > > > > get
> > > > > > >> > > the topology ID), so I’d propose introducing this in a
> > > follow-up
> > > > > KIP.
> > > > > > >> > >
> > > > > > >> > > Let me know if there are any concerns with this approach.
> > > > > > >> > >
> > > > > > >> > > Cheers,
> > > > > > >> > > Lucas
> > > > > > >> > >
> > > > > > >> > > On Mon, Oct 7, 2024 at 9:42 AM Bruno Cadonna <
> > > cado...@apache.org>
> > > > > > >> wrote:
> > > > > > >> > >>
> > > > > > >> > >> Hi all,
> > > > > > >> > >>
> > > > > > >> > >> we did some major changes to this KIP that we would like the
> > > > > > >> community
> > > > > > >> > >> to review. The changes are the following:
> > > > > > >> > >>
> > > > > > >> > >> 1. We merged the Streams group initialize RPC into the
> > > Streams
> > > > > group
> > > > > > >> > >> heartbeat RPC. We decided to merge them because it
> > > simplifies the
> > > > > > >> > >> sychronization between initialization and heartbeat during
> > > group
> > > > > > >> > >> creation and migration from a classic group. A consequence
> > > of the
> > > > > > >> merge
> > > > > > >> > >> is that the heartbeat now needs permissions to create and
> > > > > describe
> > > > > > >> > >> topics which before only the initialize call had. But we
> > > think
> > > > > that
> > > > > > >> is
> > > > > > >> > >> not a big deal. A consumer of a Streams client will send the
> > > > > > >> metadata of
> > > > > > >> > >> its topology in its first heartbeat when it joins the group.
> > > > > After
> > > > > > >> that
> > > > > > >> > >> it will not send the topology metadata anymore.
> > > > > > >> > >>
> > > > > > >> > >> 2. We discovered that to create the internal topics, the
> > > topology
> > > > > > >> > >> metadata also need to include the co-partition groups since
> > > we
> > > > > cannot
> > > > > > >> > >> infer the co-partitioning solely from the sub-topologies and
> > > the
> > > > > > >> input
> > > > > > >> > >> topics of the sub-topologies.
> > > > > > >> > >>
> > > > > > >> > >> 3. We added a section that describes how the migration from 
> > > > > > >> > >> a
> > > > > classic
> > > > > > >> > >> group to a streams group and back works. This is very
> > > similar to
> > > > > how
> > > > > > >> the
> > > > > > >> > >> migration in KIP-848 works.
> > > > > > >> > >>
> > > > > > >> > >>
> > > > > > >> > >> Please give us feedback about the changes and the KIP in
> > > general.
> > > > > > >> > >>
> > > > > > >> > >> We would like to start a vote on this KIP as soon as
> > > possible.
> > > > > > >> > >>
> > > > > > >> > >> Best,
> > > > > > >> > >> Bruno
> > > > > > >> > >>
> > > > > > >> > >> On 11.09.24 01:06, Sophie Blee-Goldman wrote:
> > > > > > >> > >>> Just following up to say thank you Lucas for the detailed
> > > > > > >> > explanations, and
> > > > > > >> > >>> especially the thorough response to my more "existential"
> > > > > questions
> > > > > > >> > about
> > > > > > >> > >>> building off 848 vs introducing a separate Streams group
> > > > > protocol.
> > > > > > >> This
> > > > > > >> > >>> really helped me understand the motivations behind this
> > > > > decision. No
> > > > > > >> > notes!
> > > > > > >> > >>>
> > > > > > >> > >>> On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy
> > > > > > >> > >>> <lbruts...@confluent.io.invalid> wrote:
> > > > > > >> > >>>
> > > > > > >> > >>>> Hi Sophie,
> > > > > > >> > >>>>
> > > > > > >> > >>>> thanks for getting deeply involved in this discussion.
> > > > > > >> > >>>>
> > > > > > >> > >>>> S16. Adding tagged fields would not require an RPC version
> > > > > bump at
> > > > > > >> > >>>> all. This should already a cover a lot of use cases where
> > > > > > >> requestion
> > > > > > >> > >>>> versioning wouldn't become an issue, as long as it is safe
> > > to
> > > > > > >> ignore
> > > > > > >> > >>>> the new fields. Other than that, I'm not aware of a formal
> > > > > > >> definition
> > > > > > >> > >>>> of the what request version bumps mean in relation to
> > > Apache
> > > > > Kafka
> > > > > > >> > >>>> versions - if anybody knows, please let me know. In my
> > > > > > >> understanding,
> > > > > > >> > >>>> the Kafka protocol evolves independently of AK versions,
> > > and
> > > > > for
> > > > > > >> the
> > > > > > >> > >>>> Kafka protocol to change, you require an accepted KIP, and
> > > not
> > > > > an
> > > > > > >> AK
> > > > > > >> > >>>> release. The grey area is that a protocol change without
> > > an AK
> > > > > > >> release
> > > > > > >> > >>>> does not mean much, because even if cloud vendors deploy 
> > > > > > >> > >>>> it
> > > > > before
> > > > > > >> an
> > > > > > >> > >>>> AK release, virtually no clients would support it and the
> > > > > protocol
> > > > > > >> > >>>> would fall back to the earlier RPC version. So maybe this
> > > isn't
> > > > > > >> that
> > > > > > >> > >>>> clearly formalized because it does not matter much in
> > > practice.
> > > > > > >> > >>>> Normally, I would expect an RPC version bump after each 
> > > > > > >> > >>>> KIP
> > > > > that
> > > > > > >> > >>>> breaks RPC compatibility. But there seems to be some
> > > > > flexibility
> > > > > > >> here
> > > > > > >> > >>>> - for example, the KIP-848 RPCs were changed after the
> > > fact in
> > > > > > >> trunk,
> > > > > > >> > >>>> because they were not exposed yet by default in AK (or any
> > > > > other
> > > > > > >> > >>>> implementation of the Kafka protocol), so a breaking 
> > > > > > >> > >>>> change
> > > > > without
> > > > > > >> > >>>> request version bump was probably not deemed problematic.
> > > > > > >> > >>>>
> > > > > > >> > >>>> S17/18. We can add an informational table for the
> > > interface to
> > > > > the
> > > > > > >> > >>>> assignor, and that should also make clear how a 
> > > > > > >> > >>>> client-side
> > > > > > >> assignment
> > > > > > >> > >>>> interface would look. Generally, our idea would be to send
> > > all
> > > > > the
> > > > > > >> > >>>> input information for the task assignor to a client and
> > > expect
> > > > > all
> > > > > > >> the
> > > > > > >> > >>>> output information of a task assignor in return, very
> > > similar
> > > > > to
> > > > > > >> > >>>> KIP-848. We did not include such a table because we do not
> > > > > define a
> > > > > > >> > >>>> public interface to define custom broker-side assignors in
> > > this
> > > > > > >> KIP.
> > > > > > >> > >>>> But generally, the input / output of the (task) assignor
> > > mostly
> > > > > > >> > >>>> follows the data fields sent to / from the brokers. Let me
> > > do
> > > > > that,
> > > > > > >> > >>>> but first I'll respond to the rest of your questions that
> > > seem
> > > > > more
> > > > > > >> > >>>> important.
> > > > > > >> > >>>>
> > > > > > >> > >>>> S19. I think an explicit trigger for rebalance could make
> > > > > sense,
> > > > > > >> but
> > > > > > >> > >>>> let me explain the thinking why we haven't included this
> > > in the
> > > > > > >> > >>>> current KIP: We don't want to include it for client-side
> > > > > assignment
> > > > > > >> > >>>> obviously, because this KIP does not include client-side
> > > > > > >> assignment.
> > > > > > >> > >>>> For triggering a rebalance when a warm-up has caught up,
> > > this
> > > > > is
> > > > > > >> > >>>> treated differently in the KIP: the task assignor is 
> > > > > > >> > >>>> called
> > > > > when a
> > > > > > >> > >>>> client sends updated task offsets. The client does not
> > > update
> > > > > its
> > > > > > >> task
> > > > > > >> > >>>> offsets regularly, but according to an interval
> > > > > > >> > >>>> `task.offset.interval.ms`, but also immediately when a
> > > client
> > > > > > >> notices
> > > > > > >> > >>>> that a warm-up has caught up. So effectively, it is mostly
> > > the
> > > > > > >> client
> > > > > > >> > >>>> that is constantly checking the lag. I absolutely believe
> > > that
> > > > > the
> > > > > > >> > >>>> client should be checking for `acceptable.recovery.lag`,
> > > and
> > > > > we are
> > > > > > >> > >>>> using the presence of the task offset field to trigger
> > > > > > >> reassignment.
> > > > > > >> > >>>> What we weren't sure is how often we need the task offsets
> > > to
> > > > > be
> > > > > > >> > >>>> updated and how often we want to retrigger the assignment
> > > > > because
> > > > > > >> of
> > > > > > >> > >>>> that. An explicit field for "forcing" a reassignment may
> > > still
> > > > > make
> > > > > > >> > >>>> sense, so that we can update the task offsets on the 
> > > > > > >> > >>>> broker
> > > > > more
> > > > > > >> often
> > > > > > >> > >>>> than we call the assignor. However, I'm not convinced that
> > > this
> > > > > > >> should
> > > > > > >> > >>>> just be a generic "rebalance trigger", as this seems to be
> > > > > going
> > > > > > >> > >>>> against the goals of KIP-848. Generally, I think one of 
> > > > > > >> > >>>> the
> > > > > > >> principles
> > > > > > >> > >>>> of KIP-848 is that a member just describes its current
> > > state
> > > > > and
> > > > > > >> > >>>> metadata, and the group coordinator makes a decision on
> > > whether
> > > > > > >> > >>>> reassignment needs to be triggered. So it would seem more
> > > > > natural
> > > > > > >> to
> > > > > > >> > >>>> just indicate a boolean "I have caught-up warm-up tasks"
> > > to the
> > > > > > >> group
> > > > > > >> > >>>> coordinator. I understand that for client-side assignment,
> > > you
> > > > > may
> > > > > > >> > >>>> want to trigger assignment explicitely without the group
> > > > > > >> coordinator
> > > > > > >> > >>>> understanding why, but then we should add such a field as
> > > part
> > > > > of
> > > > > > >> the
> > > > > > >> > >>>> client-side assignment KIP.
> > > > > > >> > >>>>
> > > > > > >> > >>>> S20. Yes, inside the group coordinator, the main
> > > differences
> > > > > are
> > > > > > >> > >>>> topology initialization handling, and the semantics of 
> > > > > > >> > >>>> task
> > > > > > >> assignment
> > > > > > >> > >>>> vs. partition assignment. It will not be a completely
> > > separate
> > > > > > >> group
> > > > > > >> > >>>> coordinator, and we will not change things like the offset
> > > > > reading
> > > > > > >> and
> > > > > > >> > >>>> writing. What will have its own code path is the handling
> > > of
> > > > > > >> > >>>> heartbeats and the reconciliation of the group (so, which
> > > > > tasks are
> > > > > > >> > >>>> sent to which member at which point in time until we reach
> > > the
> > > > > > >> > >>>> declarative target assignment that was produced by a
> > > > > client-side or
> > > > > > >> > >>>> server-side assignor). Also, the assignor implementations
> > > will
> > > > > > >> > >>>> obviously be completely separate from the consumer group
> > > > > assignors,
> > > > > > >> > >>>> but that is already the case currently. Streams groups 
> > > > > > >> > >>>> will
> > > > > > >> co-exist
> > > > > > >> > >>>> in the group coordinator with share groups (for queues),
> > > > > classic
> > > > > > >> > >>>> groups (old generic protocol for consumers and connect) 
> > > > > > >> > >>>> and
> > > > > > >> consumer
> > > > > > >> > >>>> groups (from KIP-848), and potentially a new group type 
> > > > > > >> > >>>> for
> > > > > > >> connect as
> > > > > > >> > >>>> well. Each of these group types have a separate code path
> > > for
> > > > > heart
> > > > > > >> > >>>> beating, reconciling assignments, keeping track of the
> > > internal
> > > > > > >> state
> > > > > > >> > >>>> of the group and responding to specific requests from 
> > > > > > >> > >>>> admin
> > > > > API,
> > > > > > >> like
> > > > > > >> > >>>> ConsumerGroupDescribe. For streams, there will be a 
> > > > > > >> > >>>> certain
> > > > > amount
> > > > > > >> of
> > > > > > >> > >>>> duplication from consumer groups - for example, liveness
> > > > > tracking
> > > > > > >> for
> > > > > > >> > >>>> members (setting timers for the session timeout etc.) is
> > > > > > >> essentially
> > > > > > >> > >>>> the same between both groups, and the reconciliation logic
> > > is
> > > > > quite
> > > > > > >> > >>>> similar. We will make an effort to refactor as much of 
> > > > > > >> > >>>> this
> > > > > code
> > > > > > >> in a
> > > > > > >> > >>>> way that it can be reused by both group types, and a lot 
> > > > > > >> > >>>> of
> > > > > things
> > > > > > >> > >>>> have already been made generic with the introduction of
> > > share
> > > > > > >> groups
> > > > > > >> > >>>> in the group coordinator.
> > > > > > >> > >>>>
> > > > > > >> > >>>> S21. It's nice that we are discussing "why are we doing
> > > this"
> > > > > in
> > > > > > >> > >>>> question 21 :D. I think we are touching upon the reasoning
> > > in
> > > > > the
> > > > > > >> KIP,
> > > > > > >> > >>>> but let me explain the reasoning in more detail. You are
> > > > > right, the
> > > > > > >> > >>>> reason we want to implement custom RPCs for Kafka Streams
> > > is
> > > > > not
> > > > > > >> about
> > > > > > >> > >>>> broker-side vs. client-side assignment. The broker could
> > > > > implement
> > > > > > >> the
> > > > > > >> > >>>> Member Metadata versioning scheme of Kafka Streams,
> > > intercept
> > > > > > >> consumer
> > > > > > >> > >>>> group heartbeats that look like Kafka Streams heartbeats,
> > > > > > >> deserialize
> > > > > > >> > >>>> the custom metadata and run a broker-side assignor for
> > > Kafka
> > > > > > >> Streams,
> > > > > > >> > >>>> and finally serialize custom Kafka Streams assignment
> > > > > metadata, all
> > > > > > >> > >>>> based on KIP-848 RPCs.
> > > > > > >> > >>>>
> > > > > > >> > >>>> First of all, the main difference we see with custom RPCs
> > > is
> > > > > really
> > > > > > >> > >>>> that streams groups become a first-level concept for both
> > > the
> > > > > > >> broker
> > > > > > >> > >>>> and clients of the Kafka. This also goes into the answer
> > > for
> > > > > S20
> > > > > > >> (what
> > > > > > >> > >>>> is different beside topology initialization and task
> > > > > > >> reconciliation).
> > > > > > >> > >>>> With streams groups, I can use the Admin API call to see
> > > the
> > > > > > >> current
> > > > > > >> > >>>> assignment, the target assignment, all the metadata
> > > necessary
> > > > > for
> > > > > > >> > >>>> assignment, and some information about the current
> > > topology in
> > > > > a
> > > > > > >> > >>>> human-readable and machine-readable form, instead of
> > > having to
> > > > > > >> parse
> > > > > > >> > >>>> it from logs on some client machine that may or may not
> > > have
> > > > > the
> > > > > > >> right
> > > > > > >> > >>>> log-level configured. This will help humans to understand
> > > what
> > > > > is
> > > > > > >> > >>>> going on, but can also power automated tools / CLIs / UIs,
> > > and
> > > > > I
> > > > > > >> think
> > > > > > >> > >>>> this is a major improvement for operating a streams
> > > > > application.
> > > > > > >> > >>>> Again, this is not about who is in control of the broker
> > > or the
> > > > > > >> > >>>> clients, the information becomes accessible for everybody
> > > who
> > > > > can
> > > > > > >> call
> > > > > > >> > >>>> the Admin API. It also does not depend on the cloud
> > > vendor, it
> > > > > > >> will be
> > > > > > >> > >>>> an improvement available for pure AK users, and will also
> > > work
> > > > > with
> > > > > > >> > >>>> client-side assignment (although obviously, if there is an
> > > > > > >> intention
> > > > > > >> > >>>> to start encoding proprietary data in black-box
> > > byte-arrays,
> > > > > this
> > > > > > >> > >>>> information remains hidden to users of the open source
> > > > > distribution
> > > > > > >> > >>>> again).
> > > > > > >> > >>>>
> > > > > > >> > >>>> I also think that the original proposal of reusing the new
> > > > > consumer
> > > > > > >> > >>>> group protocol for Kafka Streams has some downsides, that,
> > > > > while
> > > > > > >> not
> > > > > > >> > >>>> being deal-breakers, would make the new behaviour of Kafka
> > > > > Streams
> > > > > > >> > >>>> less than ideal. One example is the reconciliation: if we
> > > have
> > > > > a
> > > > > > >> > >>>> custom stream partition assignor (client- or broker-side
> > > > > doesn't
> > > > > > >> > >>>> matter), we can only control the target assignment that 
> > > > > > >> > >>>> the
> > > > > group
> > > > > > >> > >>>> converges to, but none of the intermediate states that the
> > > > > group
> > > > > > >> > >>>> transitions through while attempting to reach the desired
> > > end
> > > > > > >> state.
> > > > > > >> > >>>> For example, there is no way to prevent the reconciliation
> > > > > loop of
> > > > > > >> > >>>> KIP-848 from giving an active task to processX before
> > > another
> > > > > > >> instance
> > > > > > >> > >>>> on processX has revoked the corresponding standby task.
> > > There
> > > > > is no
> > > > > > >> > >>>> synchronization point where these movements are done at 
> > > > > > >> > >>>> the
> > > > > same
> > > > > > >> time
> > > > > > >> > >>>> as in the classic protocol, and on the broker we can't
> > > prevent
> > > > > > >> this,
> > > > > > >> > >>>> because the reconciliation loop in KIP-848 does not know
> > > what a
> > > > > > >> > >>>> standby task is, or what a process ID is. You could argue,
> > > you
> > > > > can
> > > > > > >> let
> > > > > > >> > >>>> the application run into LockExceptions in the meantime,
> > > but it
> > > > > > >> will
> > > > > > >> > >>>> be one of the things that will make Kafka Streams work 
> > > > > > >> > >>>> less
> > > > > than
> > > > > > >> > >>>> ideal. Another example is task offsets. It is beneficial
> > > for
> > > > > > >> streams
> > > > > > >> > >>>> partition assignors to have relatively recent task offset
> > > > > > >> information,
> > > > > > >> > >>>> which, in KIP-848, needs to be sent to the assignor using 
> > > > > > >> > >>>> a
> > > > > > >> black-box
> > > > > > >> > >>>> MemberMetadata byte array. This member metadata byte array
> > > is
> > > > > > >> > >>>> persisted to the consumer offset topic every time it
> > > changes
> > > > > (for
> > > > > > >> > >>>> failover). So we end up with a bad trade-off: if we send
> > > the
> > > > > task
> > > > > > >> > >>>> offset information to the broker very frequently, we will
> > > have
> > > > > to
> > > > > > >> > >>>> write those member metadata records very frequently as
> > > well.
> > > > > If we
> > > > > > >> do
> > > > > > >> > >>>> not send them frequently, streams task assignors will have
> > > very
> > > > > > >> > >>>> outdated lag information. In KIP-1071 we can make the
> > > decision
> > > > > to
> > > > > > >> not
> > > > > > >> > >>>> persist the task offset information, which avoids this
> > > > > trade-off.
> > > > > > >> It
> > > > > > >> > >>>> gives us up-to-date lag information in the usual case, but
> > > > > after
> > > > > > >> > >>>> fail-over of the group coordinator, our lag information
> > > may be
> > > > > > >> missing
> > > > > > >> > >>>> for a short period of time. These are two examples, but I
> > > > > think the
> > > > > > >> > >>>> general theme is, that with custom RPCs, Kafka Streams is
> > > in
> > > > > > >> control
> > > > > > >> > >>>> of how the group metadata is processed and how the tasks
> > > are
> > > > > > >> > >>>> reconciled. This is not just a consideration for now, but
> > > also
> > > > > the
> > > > > > >> > >>>> future: will KIP-848 reconciliation and metadata 
> > > > > > >> > >>>> management
> > > > > always
> > > > > > >> > >>>> work "good enough" for Kafka Streams, or will there be use
> > > > > cases
> > > > > > >> where
> > > > > > >> > >>>> we want to evolve and customize the behaviour of Kafka
> > > Streams
> > > > > > >> groups
> > > > > > >> > >>>> independently of consumer groups.
> > > > > > >> > >>>>
> > > > > > >> > >>>> A final point that I would like to make, I don't think 
> > > > > > >> > >>>> that
> > > > > custom
> > > > > > >> > >>>> streams RPCs are inherently a lot harder to do for the
> > > Kafka
> > > > > > >> Streams
> > > > > > >> > >>>> community than "just utilizing" KIP-848, at least not as
> > > much
> > > > > as
> > > > > > >> you
> > > > > > >> > >>>> would think. Many things in KIP-848, such customizable
> > > > > assignment
> > > > > > >> > >>>> metadata with a metadata versioning scheme are made to 
> > > > > > >> > >>>> look
> > > > > like
> > > > > > >> > >>>> general-purpose mechanisms, but are effectively mechanisms
> > > for
> > > > > > >> Kafka
> > > > > > >> > >>>> Streams and also completely unimplemented at this point in
> > > any
> > > > > > >> > >>>> implementation of the Kafka protocol. We know that there 
> > > > > > >> > >>>> is
> > > > > very
> > > > > > >> > >>>> little use of custom partition assignors in Kafka besides
> > > Kafka
> > > > > > >> > >>>> Streams, and we can expect the same to be true of these 
> > > > > > >> > >>>> new
> > > > > > >> > >>>> "customizable" features of KIP-848. So, besides the Kafka
> > > > > Streams
> > > > > > >> > >>>> community, there is no one driving the implementation of
> > > these
> > > > > > >> > >>>> features.  "Just utilizing" KIP-848 for Kafka Streams 
> > > > > > >> > >>>> would
> > > > > > >> > >>>> effectively mean to implement all this, and still make a
> > > lot of
> > > > > > >> deep
> > > > > > >> > >>>> changes to Kafka Streams (as mechanisms like probing
> > > rebalances
> > > > > > >> etc.
> > > > > > >> > >>>> don't make sense in KIP-848). I think adapting KIP-848 to
> > > work
> > > > > with
> > > > > > >> > >>>> Kafka Streams either way is a big effort. Piggybacking on
> > > > > consumer
> > > > > > >> > >>>> group RPCs and extending the consumer group protocol to 
> > > > > > >> > >>>> the
> > > > > point
> > > > > > >> that
> > > > > > >> > >>>> it can carry Kafka Streams on its back would not make
> > > things
> > > > > > >> > >>>> significantly easier. Some things, like online migration
> > > from
> > > > > the
> > > > > > >> old
> > > > > > >> > >>>> protocol to the new protocol or reimplementing metadata
> > > > > versioning
> > > > > > >> on
> > > > > > >> > >>>> top of KIP-848, may even be harder in KIP-848, than in
> > > > > KIP-1071. I
> > > > > > >> > >>>> don't think we can consider consumer group RPCs as a
> > > shortcut
> > > > > to
> > > > > > >> > >>>> revamp the streams rebalance protocol, but should be
> > > focussing
> > > > > on
> > > > > > >> what
> > > > > > >> > >>>> makes Kafka Streams work best down the road.
> > > > > > >> > >>>>
> > > > > > >> > >>>> Hope that makes sense!
> > > > > > >> > >>>>
> > > > > > >> > >>>> Cheers,
> > > > > > >> > >>>> Lucas
> > > > > > >> > >>>>
> > > > > > >> > >>>> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman
> > > > > > >> > >>>> <sop...@responsive.dev> wrote:
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> Thanks for another detailed response Lucas! Especially
> > > w.r.t
> > > > > how
> > > > > > >> the
> > > > > > >> > >>>> epochs
> > > > > > >> > >>>>> are defined. I also went back and re-read KIP-848 to
> > > refresh
> > > > > > >> myself,
> > > > > > >> > I
> > > > > > >> > >>>>> guess it wasn't clear to me how much of the next-gen
> > > consumer
> > > > > > >> > protocol we
> > > > > > >> > >>>>> are reusing vs what's being built from the ground up.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> S16.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>>>    It will become interesting when we want to include
> > > extra
> > > > > data
> > > > > > >> > after
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>> initial protocol definition.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> I guess that's what I'm asking us to define -- how do we
> > > > > evolve
> > > > > > >> the
> > > > > > >> > >>>> schema
> > > > > > >> > >>>>> when expanding the protocol? I think we need to set a few
> > > > > public
> > > > > > >> > >>>> contracts
> > > > > > >> > >>>>> here, such as whether/which versions get bumped when. For
> > > > > example
> > > > > > >> if
> > > > > > >> > I
> > > > > > >> > >>>>> add/modify fields twice within a single Kafka release, do
> > > we
> > > > > still
> > > > > > >> > need
> > > > > > >> > >>>> to
> > > > > > >> > >>>>> update the version? I think the answer is "yes" since I
> > > know
> > > > > you
> > > > > > >> > >>>> Confluent
> > > > > > >> > >>>>> folks upgrade brokers more often than AK releases, but
> > > this is
> > > > > > >> > different
> > > > > > >> > >>>>> from how eg the SubscriptionInfo/AssignmentInfo of the
> > > > > > >> > >>>>> StreamsPartitionAssignor works today, hence I want to put
> > > all
> > > > > > >> that in
> > > > > > >> > >>>>> writing (for those of us who aren't familiar with how all
> > > this
> > > > > > >> works
> > > > > > >> > >>>>> already and may be modifying it in the future, say, to 
> > > > > > >> > >>>>> add
> > > > > > >> > client-side
> > > > > > >> > >>>>> assignment :P)
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> Maybe a quick example of what to do to evolve this schema
> > > is
> > > > > > >> > sufficient?
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> S17.
> > > > > > >> > >>>>> The KIP seems to throw the assignor, group coordinator,
> > > topic
> > > > > > >> > >>>>> initialization, and topology versioning all into a single
> > > > > black
> > > > > > >> box
> > > > > > >> > on
> > > > > > >> > >>>> the
> > > > > > >> > >>>>> broker. I understand much of this is intentionally being
> > > > > glossed
> > > > > > >> > over as
> > > > > > >> > >>>> an
> > > > > > >> > >>>>> implementation detail since it's a KIP, but it would
> > > really
> > > > > help
> > > > > > >> to
> > > > > > >> > tease
> > > > > > >> > >>>>> apart these distinct players and define their
> > > interactions.
> > > > > For
> > > > > > >> > example,
> > > > > > >> > >>>>> what are the inputs and outputs to the assignor? For
> > > example
> > > > > > >> could we
> > > > > > >> > >>>> get a
> > > > > > >> > >>>>> table like the "Data Model
> > > > > > >> > >>>>> <
> > > > > > >> > >>>>
> > > > > > >> >
> > > > > > >>
> > > > >
> > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel
> > > > > > >> > >>>>> "
> > > > > > >> > >>>>> in 848?
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> KIP-848 goes into detail on these things, but again, it's
> > > > > unclear
> > > > > > >> > what
> > > > > > >> > >>>>> parts of 848 are and aren't supposed to be carried
> > > through.
> > > > > For
> > > > > > >> > example
> > > > > > >> > >>>> 848
> > > > > > >> > >>>>> has a concept of assignor "reasons" and client-side
> > > assignors
> > > > > and
> > > > > > >> > >>>> assignor
> > > > > > >> > >>>>> metadata which can all trigger a rebalance/group epoch
> > > bump.
> > > > > > >> However
> > > > > > >> > in
> > > > > > >> > >>>> the
> > > > > > >> > >>>>> current plan for 1071 none of these exist. Which leads me
> > > to
> > > > > my
> > > > > > >> next
> > > > > > >> > >>>>> point...
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> S18.
> > > > > > >> > >>>>> Many people use a managed Kafka service like Confluent
> > > Cloud
> > > > > where
> > > > > > >> > >>>> brokers
> > > > > > >> > >>>>> are on the bleeding edge but clients are upgraded less
> > > > > frequently.
> > > > > > >> > >>>> However,
> > > > > > >> > >>>>> the opposite is also true for a great many users: client
> > > > > > >> > applications are
> > > > > > >> > >>>>> easy to upgrade and bumped often to get new features,
> > > whereas
> > > > > the
> > > > > > >> > brokers
> > > > > > >> > >>>>> (often run by a different team altogether) are considered
> > > > > riskier
> > > > > > >> to
> > > > > > >> > >>>>> upgrade and stay on one version for a long time. We can
> > > argue
> > > > > > >> about
> > > > > > >> > which
> > > > > > >> > >>>>> case is more common -- in my personal experience it's
> > > > > actually the
> > > > > > >> > >>>> latter,
> > > > > > >> > >>>>> although I only realized this after my time at Confluent
> > > > > which was
> > > > > > >> > >>>>> obviously skewed towards meeting Confluent Cloud users --
> > > but
> > > > > > >> > regardless,
> > > > > > >> > >>>>> we obviously need to support both cases as first-class
> > > > > citizens.
> > > > > > >> And
> > > > > > >> > I'm
> > > > > > >> > >>>> a
> > > > > > >> > >>>>> bit concerned that there's not enough in the KIP to 
> > > > > > >> > >>>>> ensure
> > > > > that
> > > > > > >> > >>>> client-side
> > > > > > >> > >>>>> assignment won't end up getting hacked into the protocol
> > > as an
> > > > > > >> > >>>> afterthought.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> Don't get me wrong, I'm still not demanding you guys
> > > implement
> > > > > > >> > >>>> client-side
> > > > > > >> > >>>>> assignors as part of this KIP. I'm happy to leave the
> > > > > > >> implementation
> > > > > > >> > of
> > > > > > >> > >>>>> that to a followup, but I feel we need to flesh out the
> > > basic
> > > > > > >> > direction
> > > > > > >> > >>>> for
> > > > > > >> > >>>>> this right now, as part of the v0 Streams protocol.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> I'm also not asking you to do this alone, and am happy to
> > > work
> > > > > > >> > with/for
> > > > > > >> > >>>> you
> > > > > > >> > >>>>> on this. I have one possible proposal sketched out
> > > already,
> > > > > > >> although
> > > > > > >> > >>>> before
> > > > > > >> > >>>>> I share that I just wanted to make sure we're aligned on
> > > the
> > > > > > >> > fundamentals
> > > > > > >> > >>>>> and have mapped out the protocol in full first (ie S17).
> > > Just
> > > > > > >> want to
> > > > > > >> > >>>> have
> > > > > > >> > >>>>> a rough plan for how clients-side assignment will fit
> > > into the
> > > > > > >> > picture,
> > > > > > >> > >>>> and
> > > > > > >> > >>>>> right now it's difficult to define because it's unclear
> > > where
> > > > > the
> > > > > > >> > >>>>> boundaries between the group coordinator and broker-side
> > > > > assignor
> > > > > > >> > are (eg
> > > > > > >> > >>>>> what are assignor inputs and outputs).
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> I'm committed to ensuring this doesn't add extra work or
> > > > > delay to
> > > > > > >> > this
> > > > > > >> > >>>> KIP
> > > > > > >> > >>>>> for you guys.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> S19.
> > > > > > >> > >>>>> I think it could make sense to include a concept like the
> > > > > > >> "assignor
> > > > > > >> > >>>>> reason"  from 848  as a top-level field and rebalance
> > > trigger
> > > > > (ie
> > > > > > >> > group
> > > > > > >> > >>>>> epoch bump). This would mainly be for the client-side
> > > > > assignor to
> > > > > > >> > >>>> trigger a
> > > > > > >> > >>>>> rebalance, but for another example, it seems like we want
> > > the
> > > > > > >> > brokers to
> > > > > > >> > >>>> be
> > > > > > >> > >>>>> constantly monitoring warmup tasks and processing lag
> > > updates
> > > > > to
> > > > > > >> know
> > > > > > >> > >>>> when
> > > > > > >> > >>>>> a warmup task is to be switched, which is heavy work. Why
> > > not
> > > > > just
> > > > > > >> > let
> > > > > > >> > >>>> the
> > > > > > >> > >>>>> client who owns the warmup and knows the lag decide when
> > > it
> > > > > has
> > > > > > >> > warmed up
> > > > > > >> > >>>>> and signal the broker? It's a small thing for a client to
> > > > > check
> > > > > > >> when
> > > > > > >> > its
> > > > > > >> > >>>>> task gets within the acceptable recovery lag and notify
> > > via
> > > > > the
> > > > > > >> > heartbeat
> > > > > > >> > >>>>> if so, but it seems like a rather big deal for the broker
> > > to
> > > > > be
> > > > > > >> > >>>> constantly
> > > > > > >> > >>>>> computing these checks for every task on every hb. Just
> > > food
> > > > > for
> > > > > > >> > thought,
> > > > > > >> > >>>>> I  don't really have a strong preference over where the
> > > warmup
> > > > > > >> > decision
> > > > > > >> > >>>>> happens -- simply pointing it out that it fits very 
> > > > > > >> > >>>>> neatly
> > > > > into
> > > > > > >> the
> > > > > > >> > >>>>> "assignor reason"/"rebalance requested" framework. Making
> > > > > this a
> > > > > > >> > >>>>> client-side decision also makes it easier to evolve and
> > > > > customize
> > > > > > >> > since
> > > > > > >> > >>>>> Kafka Streams users tend to have more control over
> > > > > > >> > upgrading/implementing
> > > > > > >> > >>>>> stuff in their client application vs the brokers (which 
> > > > > > >> > >>>>> is
> > > > > often a
> > > > > > >> > >>>>> different team altogether)
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> S20.
> > > > > > >> > >>>>> To what extent do we plan to reuse implementations of 848
> > > in
> > > > > > >> order to
> > > > > > >> > >>>> build
> > > > > > >> > >>>>> the new Streams group protocol? I'm still having some
> > > trouble
> > > > > > >> > >>>> understanding
> > > > > > >> > >>>>> where we are just adding stuff on top and where we plan 
> > > > > > >> > >>>>> to
> > > > > rebuild
> > > > > > >> > from
> > > > > > >> > >>>> the
> > > > > > >> > >>>>> ground up. For example we seem to share most of the epoch
> > > > > > >> > reconciliation
> > > > > > >> > >>>>> stuff but will have different triggers for the group 
> > > > > > >> > >>>>> epoch
> > > > > bump.
> > > > > > >> Is
> > > > > > >> > it
> > > > > > >> > >>>>> possible to just "plug in" the group epoch and let the
> > > > > consumer's
> > > > > > >> > group
> > > > > > >> > >>>>> coordinator figure stuff out from there? Will we have a
> > > > > completely
> > > > > > >> > >>>> distinct
> > > > > > >> > >>>>> Streams coordinator? I know this is an implementation
> > > detail
> > > > > but
> > > > > > >> imo
> > > > > > >> > it's
> > > > > > >> > >>>>> ok to get in the weeds a bit for a large and complex KIP
> > > such
> > > > > as
> > > > > > >> > this.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> Setting aside the topology initialization stuff, is the
> > > main
> > > > > > >> > difference
> > > > > > >> > >>>>> really just in how the group coordinator will be aware of
> > > > > tasks
> > > > > > >> for
> > > > > > >> > >>>> Streams
> > > > > > >> > >>>>> groups vs partitions for consumer groups?
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> Which brings me to a high level question I feel I have to
> > > ask:
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> S21.
> > > > > > >> > >>>>> Can you go into more detail on why we have to provide
> > > Kafka
> > > > > > >> Streams
> > > > > > >> > its
> > > > > > >> > >>>> own
> > > > > > >> > >>>>> RPCs as opposed to just utilizing the protocol from
> > > KIP-848?
> > > > > The
> > > > > > >> > Rejected
> > > > > > >> > >>>>> Alternatives does address this but the main argument 
> > > > > > >> > >>>>> there
> > > > > seems
> > > > > > >> to
> > > > > > >> > be in
> > > > > > >> > >>>>> implementing a broker-side assignor. But can't you do 
> > > > > > >> > >>>>> this
> > > > > with
> > > > > > >> 848?
> > > > > > >> > And
> > > > > > >> > >>>>> given there are users who rarely upgrade their brokers
> > > just as
> > > > > > >> there
> > > > > > >> > are
> > > > > > >> > >>>>> users who rarely upgrade their clients, it seems silly to
> > > do
> > > > > all
> > > > > > >> this
> > > > > > >> > >>>> work
> > > > > > >> > >>>>> and change the fundamental direction of Kafka Streams
> > > just to
> > > > > give
> > > > > > >> > more
> > > > > > >> > >>>>> control over the assignment to the brokers.
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> For a specific example, the KIP says "Client-side
> > > assignment
> > > > > > >> makes it
> > > > > > >> > >>>> hard
> > > > > > >> > >>>>> to debug and tune the assignment logic" -- personally, I
> > > would
> > > > > > >> find
> > > > > > >> > it
> > > > > > >> > >>>>> infinitely more difficult to tune and debug assignment
> > > logic
> > > > > if it
> > > > > > >> > were
> > > > > > >> > >>>>> happening on the broker. But that's just because I work
> > > with
> > > > > the
> > > > > > >> app
> > > > > > >> > >>>> devs,
> > > > > > >> > >>>>> not the cluster operators. The point is we shouldn't
> > > design an
> > > > > > >> entire
> > > > > > >> > >>>>> protocol to give preference to one vendor or another --
> > > nobody
> > > > > > >> wants
> > > > > > >> > the
> > > > > > >> > >>>>> ASF to start yelling at us  lol D:
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> Another example, which imo is a good one (IIUC) is that 
> > > > > > >> > >>>>> "a
> > > > > simple
> > > > > > >> > >>>> parameter
> > > > > > >> > >>>>> change requires redeploying of all streams clients".
> > > That's
> > > > > > >> > definitely a
> > > > > > >> > >>>>> huge bummer, but again, why would broker-side assignors
> > > not be
> > > > > > >> able
> > > > > > >> > to
> > > > > > >> > >>>>> trigger a reassignment based on a custom config in the 
> > > > > > >> > >>>>> 848
> > > > > > >> protocol?
> > > > > > >> > We
> > > > > > >> > >>>>> could still have the ha and sticky assignors implemented
> > > on
> > > > > the
> > > > > > >> > brokers,
> > > > > > >> > >>>>> and could still define all the Streams configs as broker
> > > > > configs
> > > > > > >> to
> > > > > > >> > be
> > > > > > >> > >>>>> passed into the streams custom broker assignors. That
> > > feels
> > > > > like a
> > > > > > >> > lot
> > > > > > >> > >>>> less
> > > > > > >> > >>>>> work than redoing our own group protocol from scratch?
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> I don't want you guys to think I'm trying to block the
> > > entire
> > > > > > >> > direction
> > > > > > >> > >>>> of
> > > > > > >> > >>>>> this KIP. I'd just like to better understand your
> > > thoughts on
> > > > > this
> > > > > > >> > since
> > > > > > >> > >>>> I
> > > > > > >> > >>>>> assume you've discussed this at length internally. Just
> > > help
> > > > > me
> > > > > > >> > >>>> understand
> > > > > > >> > >>>>> why this huge proposal is justified (for my own sake and
> > > to
> > > > > > >> convince
> > > > > > >> > any
> > > > > > >> > >>>>> "others" who might be skeptical)
> > > > > > >> > >>>>>
> > > > > > >> > >>>>>
> > > > > > >> > >>>>> On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <
> > > > > mj...@apache.org>
> > > > > > >> > wrote:
> > > > > > >> > >>>>>
> > > > > > >> > >>>>>> I still need to catch up on the discussion in general.
> > > But as
> > > > > > >> > promised,
> > > > > > >> > >>>>>> I just started KIP-1088 about the `KafkaClientSupplier`
> > > > > question.
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>>> Looking forward to your feedback on the new KIP. I hope
> > > we
> > > > > can
> > > > > > >> get
> > > > > > >> > >>>>>> KIP-1088 done soon, to not block this KIP.
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>>> -Matthias
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>>> On 9/4/24 09:52, Lucas Brutschy wrote:
> > > > > > >> > >>>>>>> Hi Sophie,
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> thanks for the questions and comments!
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S1. KIP-1071 does not need to add public interfaces to
> > > > > streams,
> > > > > > >> but
> > > > > > >> > >>>> it
> > > > > > >> > >>>>>>> does not (yet) make the streams protocol client
> > > pluggable.
> > > > > > >> Matthias
> > > > > > >> > >>>>>>> promised to write a KIP that will propose a solution
> > > for the
> > > > > > >> > >>>>>>> KafkaClientSupplier soon. He suggested keeping it
> > > separate
> > > > > from
> > > > > > >> > this
> > > > > > >> > >>>>>>> KIP, but we can treat it as a blocker. You are right
> > > that we
> > > > > > >> could
> > > > > > >> > >>>> try
> > > > > > >> > >>>>>>> fitting an explicit topic initialization in this
> > > interface,
> > > > > if
> > > > > > >> we
> > > > > > >> > >>>>>>> decide we absolutely need it now, although I'm not sure
> > > if
> > > > > that
> > > > > > >> > would
> > > > > > >> > >>>>>>> actually define a useful workflow for the users to
> > > > > explicitly
> > > > > > >> > update
> > > > > > >> > >>>> a
> > > > > > >> > >>>>>>> topology. Explicit initialization cannot be part of
> > > admin
> > > > > tools,
> > > > > > >> > >>>>>>> because we do not have the topology there. It could
> > > > > probably be
> > > > > > >> a
> > > > > > >> > >>>>>>> config that defaults to off, or a method on the
> > > KafkaStreams
> > > > > > >> > object?
> > > > > > >> > >>>>>>> To me, explicit initialization opens a broad discussion
> > > on
> > > > > how
> > > > > > >> to
> > > > > > >> > do
> > > > > > >> > >>>>>>> it, so I would like to avoid including it in this KIP. 
> > > > > > >> > >>>>>>> I
> > > > > guess
> > > > > > >> it
> > > > > > >> > >>>> also
> > > > > > >> > >>>>>>> depends on the outcome of the discussion in S8-S10.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S2. It's hard to define the perfect default, and we
> > > should
> > > > > > >> probably
> > > > > > >> > >>>>>>> aim for safe operations first - since people can always
> > > up
> > > > > the
> > > > > > >> > limit.
> > > > > > >> > >>>>>>> I changed it to 20.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S11. I was going to write we can add it, but it's
> > > actually
> > > > > quite
> > > > > > >> > >>>>>>> independent of KIP-1071 and also a nice newbie ticket,
> > > so I
> > > > > > >> would
> > > > > > >> > >>>>>>> propose we treat it as out-of-scope and let somebody
> > > define
> > > > > it
> > > > > > >> in a
> > > > > > >> > >>>>>>> separate KIP - any discussion about it would anyway be
> > > lost
> > > > > in
> > > > > > >> this
> > > > > > >> > >>>>>>> long discussion thread.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S12. Member epoch behaves the same as in KIP-848. See
> > > the
> > > > > > >> KIP-848
> > > > > > >> > >>>>>>> explanation for basics:
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>
> > > > > > >> >
> > > > > > >>
> > > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> Together with the section "Reconciliation of the 
> > > > > > >> > >>>>>>> group",
> > > > > this
> > > > > > >> > should
> > > > > > >> > >>>>>>> make clear how member epochs, group epochs and
> > > assignment
> > > > > epochs
> > > > > > >> > work
> > > > > > >> > >>>>>>> in KIP-1071, but I'll try to summarize the
> > > reconciliation a
> > > > > bit
> > > > > > >> > more
> > > > > > >> > >>>>>>> explicitly below:
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> Essentially, the broker drives a reconciliation for 
> > > > > > >> > >>>>>>> each
> > > > > member
> > > > > > >> > >>>>>>> separately. The goal is to reconcile the assignment
> > > > > generated by
> > > > > > >> > the
> > > > > > >> > >>>>>>> assignor and reach the assignment epoch, but it's an
> > > > > > >> asynchronous
> > > > > > >> > >>>>>>> process, so each member may still be on an older epoch,
> > > > > which is
> > > > > > >> > the
> > > > > > >> > >>>>>>> member epoch. We do not reinvent any of the internal
> > > > > machinery
> > > > > > >> for
> > > > > > >> > >>>>>>> streams here, we are just adding standby tasks and
> > > warm-up
> > > > > > >> tasks to
> > > > > > >> > >>>>>>> the reconciliation of KIP-848.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> During reconciliation, the broker first asks the member
> > > to
> > > > > > >> revoke
> > > > > > >> > all
> > > > > > >> > >>>>>>> tasks that are not in its target assignment, by
> > > removing it
> > > > > from
> > > > > > >> > the
> > > > > > >> > >>>>>>> current assignment for the member (which is provided in
> > > the
> > > > > > >> > heartbeat
> > > > > > >> > >>>>>>> response). Until revocation is completed, the member
> > > > > remains on
> > > > > > >> the
> > > > > > >> > >>>>>>> old member epoch and in `UNREVOKED` state. Only once 
> > > > > > >> > >>>>>>> the
> > > > > member
> > > > > > >> > >>>>>>> confirms revocation of these tasks by removing them
> > > from the
> > > > > > >> set of
> > > > > > >> > >>>>>>> owned tasks in the heartbeat request, the member can
> > > > > transition
> > > > > > >> to
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>> current assignment epoch and get new tasks assigned.
> > > Once
> > > > > the
> > > > > > >> > member
> > > > > > >> > >>>>>>> has revoked all tasks that are not in its target
> > > > > assignment, it
> > > > > > >> > >>>>>>> transitions to the new epoch and gets new tasks
> > > assigned.
> > > > > There
> > > > > > >> are
> > > > > > >> > >>>>>>> some tasks that may be in the (declarative) target
> > > > > assignment of
> > > > > > >> > the
> > > > > > >> > >>>>>>> member generated by the assignor, but that may not be
> > > > > > >> immediately
> > > > > > >> > >>>>>>> added to the current assignment of the member, because
> > > they
> > > > > are
> > > > > > >> > still
> > > > > > >> > >>>>>>> "unreleased". Unreleased tasks are:
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> - A. For any task, another instance of the same process
> > > may
> > > > > own
> > > > > > >> the
> > > > > > >> > >>>>>>> task in any role (active/standby/warm-up), and we are
> > > still
> > > > > > >> > awaiting
> > > > > > >> > >>>>>>> revocation.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> - B. For an active task, an instance of any process may
> > > own
> > > > > the
> > > > > > >> > same
> > > > > > >> > >>>>>>> active task, and we are still awaiting revocation.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> As long as there is at least one unreleased task, the
> > > member
> > > > > > >> > remains
> > > > > > >> > >>>>>>> in state UNRELEASED. Once all tasks of the target
> > > > > assignment are
> > > > > > >> > >>>> added
> > > > > > >> > >>>>>>> to the current assignment of the member (because they
> > > were
> > > > > > >> > released),
> > > > > > >> > >>>>>>> the member transitions to STABLE.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> Technically, the first constraint (we cannot assign a
> > > task
> > > > > in
> > > > > > >> two
> > > > > > >> > >>>>>>> different roles to the same process) is only relevant
> > > for
> > > > > > >> stateful
> > > > > > >> > >>>>>>> tasks that use the local state directory. I wonder if 
> > > > > > >> > >>>>>>> we
> > > > > should
> > > > > > >> > relax
> > > > > > >> > >>>>>>> this restriction slightly, by marking sub-topologies
> > > that
> > > > > access
> > > > > > >> > the
> > > > > > >> > >>>>>>> local state directory. This information could also be
> > > used
> > > > > by
> > > > > > >> the
> > > > > > >> > >>>>>>> assignor. But we can also introduce as a follow-up
> > > > > improvement.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S13i. You are right, these bumps of the group epoch are
> > > "on
> > > > > > >> top" of
> > > > > > >> > >>>>>>> the KIP-848 reasons for bumping the group epoch. I
> > > added the
> > > > > > >> > KIP-848
> > > > > > >> > >>>>>>> reasons as well, because this was a bit confusing in 
> > > > > > >> > >>>>>>> the
> > > > > KIP.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S13ii. The group epoch is bumped immediately once a
> > > warm-up
> > > > > task
> > > > > > >> > has
> > > > > > >> > >>>>>>> caught up, since this is what triggers running the
> > > assignor.
> > > > > > >> > However,
> > > > > > >> > >>>>>>> the assignment epoch is only bumped once the task
> > > assignor
> > > > > has
> > > > > > >> run.
> > > > > > >> > >>>> If
> > > > > > >> > >>>>>>> the task assignment for a member hasn't changed, it 
> > > > > > >> > >>>>>>> will
> > > > > > >> transition
> > > > > > >> > >>>> to
> > > > > > >> > >>>>>>> the new assignment epoch immediately.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S13iii. In the case of a rolling bounce, the group 
> > > > > > >> > >>>>>>> epoch
> > > > > will be
> > > > > > >> > >>>>>>> bumped for every member that leaves and joins the
> > > group. For
> > > > > > >> > updating
> > > > > > >> > >>>>>>> rack ID, client tags, topology ID etc. -- this will
> > > mostly
> > > > > play
> > > > > > >> a
> > > > > > >> > >>>> role
> > > > > > >> > >>>>>>> for static membership, as you mentioned, but would
> > > cause the
> > > > > > >> same
> > > > > > >> > >>>>>>> thing - in a rolling bounce with static membership, we
> > > will
> > > > > > >> attempt
> > > > > > >> > >>>> to
> > > > > > >> > >>>>>>> recompute the assignment after every bounce, if the
> > > > > topology ID
> > > > > > >> is
> > > > > > >> > >>>>>>> changed in the members. This can cause broker load, as
> > > you
> > > > > > >> > mentioned.
> > > > > > >> > >>>>>>> Note, however, that we don't have to compute an
> > > assignment
> > > > > for
> > > > > > >> > every
> > > > > > >> > >>>>>>> group epoch bump. While KIP-848 computes its assignment
> > > as
> > > > > part
> > > > > > >> of
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>> main loop of the group coordinator, it uses 
> > > > > > >> > >>>>>>> asynchronous
> > > > > > >> assignment
> > > > > > >> > >>>> in
> > > > > > >> > >>>>>>> the case of client-side assignment, where the
> > > assignment is
> > > > > > >> slower.
> > > > > > >> > >>>> So
> > > > > > >> > >>>>>>> it will kick off the assignment on the client side,
> > > enter
> > > > > state
> > > > > > >> > >>>>>>> `ASSIGNING`, but may have more group epoch bumps in the
> > > > > meantime
> > > > > > >> > >>>> while
> > > > > > >> > >>>>>>> the assignment is computed on the client side. We leave
> > > it
> > > > > open
> > > > > > >> to
> > > > > > >> > >>>> use
> > > > > > >> > >>>>>>> the same mechanism on the broker to compute assignment
> > > > > > >> > asynchronously
> > > > > > >> > >>>>>>> on a separate thread, in case streams assignment is too
> > > > > slow for
> > > > > > >> > the
> > > > > > >> > >>>>>>> main loop of the group coordinator. We will also
> > > consider
> > > > > > >> > >>>>>>> rate-limiting the assignor to reduce broker load.
> > > However,
> > > > > it's
> > > > > > >> too
> > > > > > >> > >>>>>>> early to define this in detail, as we don't know much
> > > about
> > > > > the
> > > > > > >> > >>>>>>> performance impact yet.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S13iv. The heartbeat just sends "null" instead of the
> > > actual
> > > > > > >> value
> > > > > > >> > in
> > > > > > >> > >>>>>>> case the value didn't change since the last heartbeat,
> > > so no
> > > > > > >> > >>>>>>> comparison is required. This is defined in the 
> > > > > > >> > >>>>>>> heartbeat
> > > > > RPC and
> > > > > > >> > the
> > > > > > >> > >>>>>>> same as in KIP-848.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S13v. I think I disagree. The group epoch represents a
> > > > > version
> > > > > > >> of
> > > > > > >> > the
> > > > > > >> > >>>>>>> group state, including the current topology and the
> > > > > metadata of
> > > > > > >> all
> > > > > > >> > >>>>>>> members -- the whole input to the task assignor -- and
> > > > > defines
> > > > > > >> > >>>> exactly
> > > > > > >> > >>>>>>> the condition under which we attempt to recompute the
> > > > > > >> assignment.
> > > > > > >> > So
> > > > > > >> > >>>>>>> it's not overloaded, and has a clear definition. It's
> > > also
> > > > > > >> exactly
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>> same as in KIP-848, and we do not want to move away too
> > > far
> > > > > from
> > > > > > >> > that
> > > > > > >> > >>>>>>> protocol, unless it's something that is specific to
> > > streams
> > > > > that
> > > > > > >> > need
> > > > > > >> > >>>>>>> to be changed. Also, the protocol already uses three
> > > types
> > > > > of
> > > > > > >> > epochs
> > > > > > >> > >>>> -
> > > > > > >> > >>>>>>> member epoch, assignment epoch, group epoch. I don't
> > > think
> > > > > > >> adding
> > > > > > >> > >>>> more
> > > > > > >> > >>>>>>> epochs will make things clearer. Unless we have a 
> > > > > > >> > >>>>>>> strong
> > > > > reason
> > > > > > >> why
> > > > > > >> > >>>> we
> > > > > > >> > >>>>>>> need to add extra epochs, I'm not in favor.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S14. Assignment epoch is the same as in KIP-848:
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>
> > > > > > >> >
> > > > > > >>
> > > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> Its definition does not change at all, so we didn't
> > > > > describe it
> > > > > > >> in
> > > > > > >> > >>>>>>> detail in KIP-1071.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S15. I agree that the name is a bit generic, but the
> > > time to
> > > > > > >> change
> > > > > > >> > >>>>>>> this was when `group.instance.id` was introduced.
> > > Also, if
> > > > > we
> > > > > > >> > start
> > > > > > >> > >>>>>>> diverging from regular consumers / consumer groups in
> > > terms
> > > > > of
> > > > > > >> > >>>> naming,
> > > > > > >> > >>>>>>> it will not make things clearer. "application.id" and "
> > > > > group.id
> > > > > > >> "
> > > > > > >> > are
> > > > > > >> > >>>>>>> bad enough, in my eyes. I hope it's okay if I just
> > > improve
> > > > > the
> > > > > > >> > field
> > > > > > >> > >>>>>>> documentation.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> S16. This goes back to KIP-482. Essentially, in 
> > > > > > >> > >>>>>>> flexible
> > > > > request
> > > > > > >> > >>>>>>> schema versions you can define optional fields that
> > > will be
> > > > > > >> encoded
> > > > > > >> > >>>>>>> with a field tag on the wire, but can also be 
> > > > > > >> > >>>>>>> completely
> > > > > > >> omitted.
> > > > > > >> > >>>>>>> This is similar to how fields are encoded in, e.g.,
> > > > > protobuf. It
> > > > > > >> > >>>>>>> improves schema evolution, because we can add new
> > > fields to
> > > > > the
> > > > > > >> > >>>> schema
> > > > > > >> > >>>>>>> without bumping the version. There is also a minor
> > > > > difference in
> > > > > > >> > >>>>>>> encoding size: when encoded, tagged fields take up more
> > > > > bytes on
> > > > > > >> > the
> > > > > > >> > >>>>>>> wire because of the field tag, but they can be omitted
> > > > > > >> completely.
> > > > > > >> > It
> > > > > > >> > >>>>>>> will become interesting when we want to include extra
> > > data
> > > > > after
> > > > > > >> > the
> > > > > > >> > >>>>>>> initial protocol definition.
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> Cheers,
> > > > > > >> > >>>>>>> Lucas
> > > > > > >> > >>>>>>>
> > > > > > >> > >>>>>>> On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman
> > > > > > >> > >>>>>>> <sop...@responsive.dev> wrote:
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> Ah, my bad -- I thought I refreshed the page to get 
> > > > > > >> > >>>>>>>> the
> > > > > latest
> > > > > > >> > >>>> version
> > > > > > >> > >>>>>>>> which is why I was a bit confused when I couldn't find
> > > > > anything
> > > > > > >> > >>>> about
> > > > > > >> > >>>>>> the
> > > > > > >> > >>>>>>>> new tools which I had previously seen in the KIP. 
> > > > > > >> > >>>>>>>> Sorry
> > > > > for the
> > > > > > >> > >>>>>> confusion
> > > > > > >> > >>>>>>>> and unnecessary questions
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S1.
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>>> You could imagine calling the initialize RPC
> > > > > > >> > >>>>>>>>> explicitly (to implement explicit initialization), 
> > > > > > >> > >>>>>>>>> but
> > > > > this
> > > > > > >> would
> > > > > > >> > >>>>>>>>> still mean sending an event to the background thread,
> > > and
> > > > > the
> > > > > > >> > >>>>>>>>> background thread in turn invokes the RPC. However,
> > > > > explicit
> > > > > > >> > >>>>>>>>> initialization would require some additional public
> > > > > interfaces
> > > > > > >> > >>>> that we
> > > > > > >> > >>>>>>>>> are not including in this KIP.
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> I'm confused -- if we do all the group management
> > > stuff in
> > > > > a
> > > > > > >> > >>>> background
> > > > > > >> > >>>>>>>> thread to avoid needing public APIs, how do we pass
> > > all the
> > > > > > >> > >>>>>>>> Streams-specific and app-specific info to the broker
> > > for
> > > > > the
> > > > > > >> > >>>>>>>> StreamsGroupInitialize? I am guessing this is where we
> > > > > need to
> > > > > > >> > >>>> invoke
> > > > > > >> > >>>>>>>> internal APIs and is related to our discussion about
> > > > > removing
> > > > > > >> the
> > > > > > >> > >>>>>>>> KafkaClientSupplier. However, it seems to me that no
> > > > > matter how
> > > > > > >> > you
> > > > > > >> > >>>>>> look at
> > > > > > >> > >>>>>>>> it, Kafka Streams will need to pass information to and
> > > > > from the
> > > > > > >> > >>>>>> background
> > > > > > >> > >>>>>>>> thread if the background thread is to be aware of
> > > things
> > > > > like
> > > > > > >> > tasks,
> > > > > > >> > >>>>>> offset
> > > > > > >> > >>>>>>>> sums, repartition topics, etc
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> We don't really need to rehash everything here since 
> > > > > > >> > >>>>>>>> it
> > > > > seems
> > > > > > >> like
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>> proposal to add a StreamsConsumer/StreamsAssignment
> > > > > interface
> > > > > > >> will
> > > > > > >> > >>>>>> address
> > > > > > >> > >>>>>>>> this, which I believe we're in agreement on. I just
> > > wanted
> > > > > to
> > > > > > >> > point
> > > > > > >> > >>>> out
> > > > > > >> > >>>>>>>> that this work to replace the KafkaClientSupplier is
> > > > > clearly
> > > > > > >> > >>>> intimately
> > > > > > >> > >>>>>>>> tied up with KIP-1071, and should imho be part of this
> > > KIP
> > > > > and
> > > > > > >> not
> > > > > > >> > >>>> its
> > > > > > >> > >>>>>> own
> > > > > > >> > >>>>>>>> standalone thing.
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> By the way I'm happy to hop on a call to help move
> > > things
> > > > > > >> forward
> > > > > > >> > on
> > > > > > >> > >>>>>> that
> > > > > > >> > >>>>>>>> front (or anything really). Although I guess we're 
> > > > > > >> > >>>>>>>> just
> > > > > waiting
> > > > > > >> > on a
> > > > > > >> > >>>>>> design
> > > > > > >> > >>>>>>>> right now?
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S2. I was suggesting something lower for the default
> > > upper
> > > > > > >> limit
> > > > > > >> > on
> > > > > > >> > >>>> all
> > > > > > >> > >>>>>>>> groups. I don't feel too strongly about it, just
> > > wanted to
> > > > > > >> point
> > > > > > >> > out
> > > > > > >> > >>>>>> that
> > > > > > >> > >>>>>>>> the tradeoff is not about the assignor runtime but
> > > rather
> > > > > about
> > > > > > >> > >>>> resource
> > > > > > >> > >>>>>>>> consumption, and that too many warmups could put undue
> > > > > load on
> > > > > > >> the
> > > > > > >> > >>>>>> cluster.
> > > > > > >> > >>>>>>>> In the end if we want to trust application operators
> > > then
> > > > > I'd
> > > > > > >> say
> > > > > > >> > >>>> it's
> > > > > > >> > >>>>>> fine
> > > > > > >> > >>>>>>>> to use a higher cluster-wide max like 100.
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S8-10 I'll get back to you on this in another followup
> > > > > since
> > > > > > >> I'm
> > > > > > >> > >>>> still
> > > > > > >> > >>>>>>>> thinking some things through and want to keep the
> > > > > discussion
> > > > > > >> > >>>> rolling for
> > > > > > >> > >>>>>>>> now. In the meantime I have some additional questions:
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S11. One of the main issues with unstable assignments
> > > > > today is
> > > > > > >> the
> > > > > > >> > >>>> fact
> > > > > > >> > >>>>>>>> that assignors rely on deterministic assignment and
> > > use the
> > > > > > >> > process
> > > > > > >> > >>>> Id,
> > > > > > >> > >>>>>>>> which is not configurable and only persisted via local
> > > > > disk in
> > > > > > >> a
> > > > > > >> > >>>>>>>> best-effort attempt. It would be a very small change 
> > > > > > >> > >>>>>>>> to
> > > > > include
> > > > > > >> > >>>> this in
> > > > > > >> > >>>>>>>> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to
> > > do
> > > > > the
> > > > > > >> PR
> > > > > > >> > for
> > > > > > >> > >>>>>> this
> > > > > > >> > >>>>>>>> myself so as not to add to your load). There's a 
> > > > > > >> > >>>>>>>> ticket
> > > > > for it
> > > > > > >> > here:
> > > > > > >> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-15190
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S12. What exactly is the member epoch and how is it
> > > > > defined?
> > > > > > >> When
> > > > > > >> > >>>> is it
> > > > > > >> > >>>>>>>> bumped? I see in the HB request field that certain
> > > values
> > > > > > >> signal a
> > > > > > >> > >>>>>> member
> > > > > > >> > >>>>>>>> joining/leaving, but I take it there are valid 
> > > > > > >> > >>>>>>>> positive
> > > > > values
> > > > > > >> > >>>> besides
> > > > > > >> > >>>>>> the
> > > > > > >> > >>>>>>>> 0/-1/-2 codes? More on this in the next question:
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S13. The KIP says the group epoch is updated in these
> > > three
> > > > > > >> cases:
> > > > > > >> > >>>>>>>> a. Every time the topology is updated through the
> > > > > > >> > >>>>>> StreamsGroupInitialize API
> > > > > > >> > >>>>>>>> b. When a member with an assigned warm-up task reports
> > > a
> > > > > task
> > > > > > >> > >>>> changelog
> > > > > > >> > >>>>>>>> offset and task changelog end offset whose difference
> > > is
> > > > > less
> > > > > > >> that
> > > > > > >> > >>>>>>>> acceptable.recovery.lag.
> > > > > > >> > >>>>>>>> c. When a member updates its topology ID, rack ID,
> > > client
> > > > > tags
> > > > > > >> or
> > > > > > >> > >>>>>> process
> > > > > > >> > >>>>>>>> ID. Note: Typically, these do not change within the
> > > > > lifetime
> > > > > > >> of a
> > > > > > >> > >>>>>> Streams
> > > > > > >> > >>>>>>>> client, so this only happens when a member with static
> > > > > > >> membership
> > > > > > >> > >>>>>> rejoins
> > > > > > >> > >>>>>>>> with an updated configuration.
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S13.i First, just to clarify, these are not the *only*
> > > > > times
> > > > > > >> the
> > > > > > >> > >>>> group
> > > > > > >> > >>>>>>>> epoch is bumped but rather the additional cases on top
> > > of
> > > > > the
> > > > > > >> > >>>> regular
> > > > > > >> > >>>>>>>> consumer group protocol -- so it's still bumped when a
> > > new
> > > > > > >> member
> > > > > > >> > >>>> joins
> > > > > > >> > >>>>>> or
> > > > > > >> > >>>>>>>> leaves, etc, right?
> > > > > > >> > >>>>>>>> S13.ii Second, does (b) imply the epoch is bumped just
> > > > > whenever
> > > > > > >> > the
> > > > > > >> > >>>>>> broker
> > > > > > >> > >>>>>>>> notices a task has finished warming up, or does it
> > > first
> > > > > > >> reassign
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>> task
> > > > > > >> > >>>>>>>> and then bump the group epoch as a result of this task
> > > > > > >> > reassignment?
> > > > > > >> > >>>>>>>> S13.iii Third, (a) mentions the group epoch is bumped
> > > on
> > > > > each
> > > > > > >> > >>>>>>>> StreamsGroupInitialize API but (c) says it gets bumped
> > > > > > >> whenever a
> > > > > > >> > >>>> member
> > > > > > >> > >>>>>>>> updates its topology ID. Does this mean that in the
> > > event
> > > > > of a
> > > > > > >> > >>>> rolling
> > > > > > >> > >>>>>>>> upgrade that changes the topology, the group epoch is
> > > > > bumped
> > > > > > >> just
> > > > > > >> > >>>> once
> > > > > > >> > >>>>>> or
> > > > > > >> > >>>>>>>> for each member that updates its topology ID? Or does
> > > the
> > > > > > >> > "updating
> > > > > > >> > >>>> its
> > > > > > >> > >>>>>>>> topology ID" somehow have something to do with static
> > > > > > >> membership?
> > > > > > >> > >>>>>>>> S13.iv How does the broker know when a member has
> > > changed
> > > > > its
> > > > > > >> rack
> > > > > > >> > >>>> ID,
> > > > > > >> > >>>>>>>> client tags, etc -- does it compare these values on
> > > every
> > > > > hb??
> > > > > > >> > That
> > > > > > >> > >>>>>> seems
> > > > > > >> > >>>>>>>> like a lot of broker load. If this is all strictly for
> > > > > static
> > > > > > >> > >>>> membership
> > > > > > >> > >>>>>>>> then have we considered leaving static membership
> > > support
> > > > > for a
> > > > > > >> > >>>> followup
> > > > > > >> > >>>>>>>> KIP? Not necessarily advocating for that, just
> > > wondering if
> > > > > > >> this
> > > > > > >> > was
> > > > > > >> > >>>>>>>> thought about already. Imo it would be acceptable to
> > > not
> > > > > > >> support
> > > > > > >> > >>>> static
> > > > > > >> > >>>>>>>> membership in the first version (especially if we 
> > > > > > >> > >>>>>>>> fixed
> > > > > other
> > > > > > >> > issues
> > > > > > >> > >>>>>> like
> > > > > > >> > >>>>>>>> making the process Id configurable to get actual 
> > > > > > >> > >>>>>>>> stable
> > > > > > >> > >>>> assignments!)
> > > > > > >> > >>>>>>>> S13.v I'm finding the concept of group epoch here to 
> > > > > > >> > >>>>>>>> be
> > > > > > >> somewhat
> > > > > > >> > >>>>>>>> overloaded. It sounds like it's trying to keep track 
> > > > > > >> > >>>>>>>> of
> > > > > > >> multiple
> > > > > > >> > >>>> things
> > > > > > >> > >>>>>>>> within a single version: the group membership, the
> > > topology
> > > > > > >> > >>>> version, the
> > > > > > >> > >>>>>>>> assignment version, and various member statuses (eg
> > > rack
> > > > > > >> ID/client
> > > > > > >> > >>>>>> tags).
> > > > > > >> > >>>>>>>> Personally, I think it would be extremely valuable to
> > > > > unroll
> > > > > > >> all
> > > > > > >> > of
> > > > > > >> > >>>> this
> > > > > > >> > >>>>>>>> into separate epochs with clear definitions and
> > > boundaries
> > > > > and
> > > > > > >> > >>>> triggers.
> > > > > > >> > >>>>>>>> For example, I would suggest something like this:
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> group epoch: describes the group at a point in time,
> > > bumped
> > > > > > >> when
> > > > > > >> > >>>> set of
> > > > > > >> > >>>>>>>> member ids changes (ie static or dynamic member leaves
> > > or
> > > > > > >> joins)
> > > > > > >> > --
> > > > > > >> > >>>>>>>> triggered by group coordinator noticing a change in
> > > group
> > > > > > >> > membership
> > > > > > >> > >>>>>>>> topology epoch: describes the topology version, bumped
> > > for
> > > > > each
> > > > > > >> > >>>>>> successful
> > > > > > >> > >>>>>>>> StreamsGroupInitialize that changes the broker's
> > > topology
> > > > > ID --
> > > > > > >> > >>>>>> triggered
> > > > > > >> > >>>>>>>> by group coordinator bumping the topology ID (ie
> > > > > > >> > >>>> StreamsGroupInitialize)
> > > > > > >> > >>>>>>>> (optional) member epoch: describes the member version,
> > > > > bumped
> > > > > > >> > when a
> > > > > > >> > >>>>>>>> memberId changes its rack ID, client tags, or process
> > > ID
> > > > > (maybe
> > > > > > >> > >>>> topology
> > > > > > >> > >>>>>>>> ID, not sure) -- is this needed only for static
> > > membership?
> > > > > > >> Going
> > > > > > >> > >>>> back
> > > > > > >> > >>>>>> to
> > > > > > >> > >>>>>>>> question S12, when is the member epoch is bumped in 
> > > > > > >> > >>>>>>>> the
> > > > > current
> > > > > > >> > >>>>>> proposal?
> > > > > > >> > >>>>>>>> assignment epoch: describes the assignment version,
> > > bumped
> > > > > when
> > > > > > >> > the
> > > > > > >> > >>>>>>>> assignor runs successfully (even if the actual
> > > assignment
> > > > > of
> > > > > > >> tasks
> > > > > > >> > >>>> does
> > > > > > >> > >>>>>> not
> > > > > > >> > >>>>>>>> change) -- can be triggered by tasks completing
> > > warmup, a
> > > > > > >> manual
> > > > > > >> > >>>>>>>> client-side trigger (future KIP only), etc (by
> > > definition
> > > > > is
> > > > > > >> > bumped
> > > > > > >> > >>>> any
> > > > > > >> > >>>>>>>> time any of the other epochs are bumped since they all
> > > > > trigger
> > > > > > >> a
> > > > > > >> > >>>>>>>> reassignment)
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S14. The KIP also mentions a new concept of an
> > > "assignment
> > > > > > >> epoch"
> > > > > > >> > >>>> but
> > > > > > >> > >>>>>>>> doesn't seem to ever elaborate on how this is defined
> > > and
> > > > > what
> > > > > > >> > it's
> > > > > > >> > >>>> used
> > > > > > >> > >>>>>>>> for. I assume it's bumped each time the assignment
> > > changes
> > > > > and
> > > > > > >> > >>>>>> represents a
> > > > > > >> > >>>>>>>> sort of "assignment version", is that right? Can you
> > > > > describe
> > > > > > >> in
> > > > > > >> > >>>> more
> > > > > > >> > >>>>>>>> detail the difference between the group epoch and the
> > > > > > >> assignment
> > > > > > >> > >>>> epoch?
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S15. I assume the "InstanceId" field in the HB request
> > > > > > >> corresponds
> > > > > > >> > >>>> to
> > > > > > >> > >>>>>> the
> > > > > > >> > >>>>>>>> group.instance.id config of static membership. I
> > > always
> > > > > felt
> > > > > > >> this
> > > > > > >> > >>>> field
> > > > > > >> > >>>>>>>> name was too generic and easy to get mixed up with
> > > other
> > > > > > >> > >>>> identifiers,
> > > > > > >> > >>>>>> WDYT
> > > > > > >> > >>>>>>>> about "StaticId" or "StaticInstanceId" to make the
> > > static
> > > > > > >> > membership
> > > > > > >> > >>>>>>>> relation more explicit?
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> S16. what is the "flexibleVersions" field in the RPC
> > > > > > >> protocols? I
> > > > > > >> > >>>> assume
> > > > > > >> > >>>>>>>> this is related to versioning compatibility but it's
> > > not
> > > > > quite
> > > > > > >> > clear
> > > > > > >> > >>>>>> from
> > > > > > >> > >>>>>>>> the KIP how this is to be used. For example if we
> > > modify
> > > > > the
> > > > > > >> > >>>> protocol by
> > > > > > >> > >>>>>>>> adding new fields at the end, we'd bump the version 
> > > > > > >> > >>>>>>>> but
> > > > > should
> > > > > > >> the
> > > > > > >> > >>>>>>>> flexibleVersions field change as well?
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy
> > > > > > >> > >>>>>>>> <lbruts...@confluent.io.invalid> wrote:
> > > > > > >> > >>>>>>>>
> > > > > > >> > >>>>>>>>> Hi Sophie,
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> Thanks for your detailed comments - much appreciated!
> > > I
> > > > > think
> > > > > > >> you
> > > > > > >> > >>>> read
> > > > > > >> > >>>>>>>>> a version of the KIP that did not yet include the
> > > admin
> > > > > > >> > >>>> command-line
> > > > > > >> > >>>>>>>>> tool and the Admin API extensions, so some of the
> > > > > comments are
> > > > > > >> > >>>> already
> > > > > > >> > >>>>>>>>> addressed in the KIP.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize
> > > are
> > > > > > >> called
> > > > > > >> > in
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>>> consumer background thread. Note that in the new
> > > consumer
> > > > > > >> > threading
> > > > > > >> > >>>>>>>>> model, all RPCs are run by the background thread.
> > > Check
> > > > > out
> > > > > > >> this:
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>
> > > > > > >> >
> > > > > > >>
> > > > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
> > > > > > >> > >>>>>>>>> for more information. Both our RPCs are just part of
> > > the
> > > > > group
> > > > > > >> > >>>>>>>>> membership management and do not need to be invoked
> > > > > > >> explicitly by
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>>> application thread. You could imagine calling the
> > > > > initialize
> > > > > > >> RPC
> > > > > > >> > >>>>>>>>> explicitly (to implement explicit initialization), 
> > > > > > >> > >>>>>>>>> but
> > > > > this
> > > > > > >> would
> > > > > > >> > >>>>>>>>> still mean sending an event to the background thread,
> > > and
> > > > > the
> > > > > > >> > >>>>>>>>> background thread in turn invokes the RPC. However,
> > > > > explicit
> > > > > > >> > >>>>>>>>> initialization would require some additional public
> > > > > interfaces
> > > > > > >> > >>>> that we
> > > > > > >> > >>>>>>>>> are not including in this KIP. StreamsGroupDescribe 
> > > > > > >> > >>>>>>>>> is
> > > > > called
> > > > > > >> by
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>>> AdminClient, and used by the command-line tool
> > > > > > >> > >>>>>>>>> kafka-streams-groups.sh.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S2. I think the max.warmup.replicas=100 suggested by
> > > Nick
> > > > > was
> > > > > > >> > >>>> intended
> > > > > > >> > >>>>>>>>> as the upper limit for setting the group
> > > configuration on
> > > > > the
> > > > > > >> > >>>> broker.
> > > > > > >> > >>>>>>>>> Just to make sure that this was not a
> > > misunderstanding. By
> > > > > > >> > default,
> > > > > > >> > >>>>>>>>> values above 100 should be rejected when setting a
> > > > > specific
> > > > > > >> value
> > > > > > >> > >>>> for
> > > > > > >> > >>>>>>>>> group. Are you suggesting 20 or 30 for the default
> > > value
> > > > > for
> > > > > > >> > >>>> groups,
> > > > > > >> > >>>>>>>>> or the default upper limit for the group
> > > configuration?
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S3. Yes, it's supposed to be used like
> > > > > SHUTDOWN_APPLICATION.
> > > > > > >> The
> > > > > > >> > >>>>>>>>> MemberEpoch=-1 is a left-over from an earlier
> > > discussion.
> > > > > It
> > > > > > >> > means
> > > > > > >> > >>>>>>>>> that the member is leaving the group, so the
> > > intention was
> > > > > > >> that
> > > > > > >> > the
> > > > > > >> > >>>>>>>>> member must leave the group when it asks the other
> > > > > members to
> > > > > > >> > shut
> > > > > > >> > >>>>>>>>> down. We later reconsidered this and decided that all
> > > > > > >> > applications
> > > > > > >> > >>>>>>>>> should just react to the shutdown application signal
> > > that
> > > > > is
> > > > > > >> > >>>> returned
> > > > > > >> > >>>>>>>>> by the broker, so the client first sets the
> > > > > > >> ShutdownApplication
> > > > > > >> > and
> > > > > > >> > >>>>>>>>> later leaves the group. Thanks for spotting this, I
> > > > > removed
> > > > > > >> it.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S4. Not sure if this refers to the latest version of
> > > the
> > > > > KIP.
> > > > > > >> We
> > > > > > >> > >>>> added
> > > > > > >> > >>>>>>>>> an extension of the admin API and a
> > > > > kafka-streams-groups.sh
> > > > > > >> > >>>>>>>>> command-line tool to the KIP already.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S5. All RPCs for dealing with offsets will keep
> > > working
> > > > > with
> > > > > > >> > >>>> streams
> > > > > > >> > >>>>>>>>> groups. The extension of the admin API is rather
> > > cosmetic,
> > > > > > >> since
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>>> method names use "consumer group". The RPCs, however,
> > > are
> > > > > > >> generic
> > > > > > >> > >>>> and
> > > > > > >> > >>>>>>>>> do not need to be changed.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S6. Yes, you can use the DeleteGroup RPC with any
> > > group
> > > > > ID,
> > > > > > >> > whether
> > > > > > >> > >>>>>>>>> streams group or not.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S7. See the admin API section.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S8. I guess for both A and B, I am not sure what you
> > > are
> > > > > > >> > >>>> suggesting.
> > > > > > >> > >>>>>>>>> Do you want to make the broker-side topology 
> > > > > > >> > >>>>>>>>> immutable
> > > > > and not
> > > > > > >> > >>>> include
> > > > > > >> > >>>>>>>>> any information about the topology, like the topology
> > > ID
> > > > > in
> > > > > > >> the
> > > > > > >> > >>>> RPC?
> > > > > > >> > >>>>>>>>> It would seem that this would be a massive food-gun
> > > for
> > > > > > >> people,
> > > > > > >> > if
> > > > > > >> > >>>>>>>>> they start changing their topology and don't notice
> > > that
> > > > > the
> > > > > > >> > >>>> broker is
> > > > > > >> > >>>>>>>>> looking at a completely different version of the
> > > > > topology. Or
> > > > > > >> did
> > > > > > >> > >>>> you
> > > > > > >> > >>>>>>>>> mean that there is some kind of topology ID, so that
> > > at
> > > > > least
> > > > > > >> we
> > > > > > >> > >>>> can
> > > > > > >> > >>>>>>>>> detect inconsistencies between broker and client-side
> > > > > > >> topologies,
> > > > > > >> > >>>> and
> > > > > > >> > >>>>>>>>> we fence out any member with an incorrect topology 
> > > > > > >> > >>>>>>>>> ID?
> > > > > Then we
> > > > > > >> > >>>> seem to
> > > > > > >> > >>>>>>>>> end up with mostly the same RPCs and the same
> > > questions
> > > > > (how
> > > > > > >> is
> > > > > > >> > the
> > > > > > >> > >>>>>>>>> topology ID generated?). I agree that the latter
> > > could be
> > > > > an
> > > > > > >> > >>>> option.
> > > > > > >> > >>>>>>>>> See summary at the end of this message.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S9. If we flat out refuse topology updates, agree - 
> > > > > > >> > >>>>>>>>> we
> > > > > cannot
> > > > > > >> let
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>>> application crash on minor changes to the topology.
> > > > > However,
> > > > > > >> if
> > > > > > >> > we
> > > > > > >> > >>>>>>>>> allow topology updates as described in the KIP, there
> > > are
> > > > > only
> > > > > > >> > >>>> upsides
> > > > > > >> > >>>>>>>>> to making the topology ID more sensitive. All it will
> > > > > cause is
> > > > > > >> > >>>> that a
> > > > > > >> > >>>>>>>>> client will have to resend a
> > > `StreamsGroupInitialize`, and
> > > > > > >> during
> > > > > > >> > >>>> the
> > > > > > >> > >>>>>>>>> rolling bounce, older clients will not get tasks
> > > assigned.
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> S10. The intention in the KIP is really just this -
> > > old
> > > > > > >> clients
> > > > > > >> > can
> > > > > > >> > >>>>>>>>> only retain tasks, but not get new ones. If the
> > > topology
> > > > > has
> > > > > > >> > >>>>>>>>> sub-topologies, these will initially be assigned to
> > > the
> > > > > new
> > > > > > >> > >>>> clients,
> > > > > > >> > >>>>>>>>> which also have the new topology. You are right that
> > > > > rolling
> > > > > > >> an
> > > > > > >> > >>>>>>>>> application where the old structure and the new
> > > structure
> > > > > are
> > > > > > >> > >>>>>>>>> incompatible (e.g. different subtopologies access the
> > > same
> > > > > > >> > >>>> partition)
> > > > > > >> > >>>>>>>>> will cause problems. But this will also cause
> > > problems in
> > > > > the
> > > > > > >> > >>>> current
> > > > > > >> > >>>>>>>>> protocol, so I'm not sure, if it's strictly a
> > > regression,
> > > > > it's
> > > > > > >> > just
> > > > > > >> > >>>>>>>>> unsupported (which we can only make the best effort 
> > > > > > >> > >>>>>>>>> to
> > > > > > >> detect).
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> In summary, for the topology ID discussion, I mainly
> > > see
> > > > > two
> > > > > > >> > >>>> options:
> > > > > > >> > >>>>>>>>> 1) stick with the current KIP proposal
> > > > > > >> > >>>>>>>>> 2) define the topology ID as the hash of the
> > > > > > >> > StreamsGroupInitialize
> > > > > > >> > >>>>>>>>> topology metadata only, as you suggested, and fence
> > > out
> > > > > > >> members
> > > > > > >> > >>>> with
> > > > > > >> > >>>>>>>>> an incompatible topology ID.
> > > > > > >> > >>>>>>>>> I think doing 2 is also a good option, but in the end
> > > it
> > > > > comes
> > > > > > >> > >>>> down to
> > > > > > >> > >>>>>>>>> how important we believe topology updates (where the
> > > set
> > > > > of
> > > > > > >> > >>>>>>>>> sub-topologies or set of internal topics are
> > > affected) to
> > > > > be.
> > > > > > >> The
> > > > > > >> > >>>> main
> > > > > > >> > >>>>>>>>> goal is to make the new protocol a drop-in 
> > > > > > >> > >>>>>>>>> replacement
> > > > > for the
> > > > > > >> > old
> > > > > > >> > >>>>>>>>> protocol, and at least define the RPCs in a way that
> > > we
> > > > > won't
> > > > > > >> > need
> > > > > > >> > >>>>>>>>> another KIP which bumps the RPC version to make the
> > > > > protocol
> > > > > > >> > >>>>>>>>> production-ready. Adding more command-line tools, or
> > > > > public
> > > > > > >> > >>>> interfaces
> > > > > > >> > >>>>>>>>> seems less critical as it doesn't change the protocol
> > > and
> > > > > can
> > > > > > >> be
> > > > > > >> > >>>> done
> > > > > > >> > >>>>>>>>> easily later on. The main question becomes - is the
> > > > > protocol
> > > > > > >> > >>>>>>>>> production-ready and sufficient to replace the 
> > > > > > >> > >>>>>>>>> classic
> > > > > > >> protocol
> > > > > > >> > if
> > > > > > >> > >>>> we
> > > > > > >> > >>>>>>>>> go with 2?
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>>>> Cheers,
> > > > > > >> > >>>>>>>>> Lucas
> > > > > > >> > >>>>>>>>>
> > > > > > >> > >>>>>>
> > > > > > >> > >>>>
> > > > > > >> > >>>>
> > > > > > >> > >>>
> > > > > > >> > >>
> > > > > > >> >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > >
> > >
> > >

Reply via email to