Actually, scratch that. On reflection I think I prefer Bruno's original
idea to specify it in the configuration.

Cheers,
Nick

On Sat, 16 Nov 2024 at 17:59, Nick Telford <nick.telf...@gmail.com> wrote:

> Hey everyone,
>
> With respect to Bruno's proposal, could instances cache their topology
> epoch on disk, and then upgrades/downgrades would simply involve deleting
> the cached epoch before starting the instance?
>
> My thinking is that this might be potentially simpler for users than
> modifying configuration.
>
> Cheers,
> Nick
>
> On Sat, 16 Nov 2024, 00:41 Sophie Blee-Goldman, <sop...@responsive.dev>
> wrote:
>
>> Thanks for the updates! Few minor questions before we wrap this up and
>> move
>> to a vote:
>>
>> S1. Can you clarify the outward-facing behavior of a group that enters the
>> NOT_READY state due to, say, missing source topics? It sounds like the
>> application will continue to run without processing anything, which would
>> be a change compared to today where we shut down with an explicit error.
>> Am
>> I reading this correctly and is this change intentional?
>>
>> S2. Regarding topology upgrades, I am fully in favor of Bruno's proposal.
>> Just feel like we need to flesh it out a bit. For example you say  "no
>> specific tooling" is needed, but how would a user go about upgrading the
>> topology? If we can just describe this process in a bit more detail then I
>> would feel good.
>>
>> S3. Lastly, just a quick reminder, we should make sure to clarify in the
>> KIP how the topology ID will be computed since that impacts which kind of
>> upgrades are possible and is therefore part of the public API. IIUC the
>> plan is to compute it from the topics which makes sense to me (though I do
>> think it should eventually be pluggable, perhaps in a followup KIP)
>>
>> On Thu, Nov 7, 2024 at 10:08 AM Bruno Cadonna <cado...@apache.org> wrote:
>>
>> > Hi Lucas and all others,
>> >
>> > Thanks for the proposals regarding the topology upgrades!
>> >
>> > I have an additional proposal:
>> >
>> > We introduce a topology epoch alongside the topology ID. The topology
>> > epoch is set to 0 on the group coordinator for a new group. The topology
>> > epoch is also a config within the Streams client, the default is 1.
>> >
>> > When the Streams client starts up, it sends its topology epoch alongside
>> > the topology ID and the topology to the group coordinator. The group
>> > coordinator initializes the topology with the given topology ID and
>> > increases the topology epoch to 1.
>> >
>> > When the client wants to upgrade a topology it needs to send a valid
>> > topology epoch alongside the new topology ID and the topology. We can
>> > decide if the new topology epoch = current topology at group coordinator
>> > + 1 or just new topology epoch > current topology on group coordinator.
>> >
>> > When the group coordinator receives a topology ID that differs from the
>> > one intialized for the group, the group coordinator only upgrades to the
>> > new topology if the topology epoch is valid. Then, it sets the topology
>> > epoch to the new value. If the topology epoch is not valid, that means
>> > the topology ID is an old one and the group coordinator does not
>> > re-initialize the topology.
>> >
>> > The current topology epoch can be retrieved with the describe RPC.
>> >
>> > Accidental rollbacks of topologies are not be possible. Only wanted
>> > rollbacks of topologies with an appropriate topology epoch are allowed.
>> >
>> > Specific tooling is not needed.
>> >
>> > The topology upgrade can be automated in CI in the following way:
>> >
>> > 1. Call streams group describe to determine the current epoch.
>> >
>> > 2. Set the Streams client config topology.epoch = current epoch + 1
>> >
>> > 3. Roll the application.
>> >
>> > WDYT?
>> >
>> > Best,
>> > Bruno
>> >
>> >
>> > On 07.11.24 18:26, Lucas Brutschy wrote:
>> > > Hi all,
>> > >
>> > > I have updated the KIP with some details around how we handle the
>> > > cases when essential topics required by the topology are not present.
>> > > This is described in a new section "Handling topic topology
>> > > mismatches". The short summary is that we enter a state where the
>> > > group member's heartbeats will succeed, but no assignments will be
>> > > made, and the nature of the topic misconfiguration is communicated to
>> > > the members using the `status` field of the heartbeat response.
>> > >
>> > > Working towards putting up this KIP up for a vote, there was one last
>> > > concern mentioned during our last sync, which was a concern raised by
>> > > Sophie and others around topology updating. I would like to propose
>> > > the following changes to the KIP to solidify the story around topology
>> > > updating.
>> > >
>> > > To recap, a member sends its topology metadata every time it attempts
>> > > to join a streams group. When the group already has a topology set,
>> > > the KIP proposes to update the topology for the group immediately with
>> > > that first heartbeat of a member. All other members will then receive
>> > > a non-fatal group status INCONSISTENT_TOPOLOGY, letting them know that
>> > > their topology is not equal to the groups topology anymore. Those
>> > > members with an outdated topology will be able to retain their current
>> > > tasks, but will not receive new tasks, until they have also updated
>> > > their topology. This enables a rolling bounce to upgrade the topology.
>> > >
>> > > There is a concern with this behavior, raised both in our discussions
>> > > but also by Guozhang and Sophie. The problem is that a “misbehaving”
>> > > member can always roll back a topology upgrade, if it is still running
>> > > with the old version of the topology. For example, if we roll 20
>> > > members to a new topology version, and we have successfully restarted
>> > > 19 out of 20, but the last member is restarted with the old topology,
>> > > this will cause the topology version on the broker to be rolled back.
>> > > This could happen, for example, because topology update fails on the
>> > > last member, or the last member runs into a fatal error of some kind
>> > > (out-of-memory or being fenced, for example). Once the failed member
>> > > rejoins, it updates the topology on the broker to the “old” topology,
>> > > so performs an involuntary roll-back. Now, we would be in a situation
>> > > where 19 out of 20 members are blocked from getting new tasks
>> > > assigned. We would be in this state just temporary, until the final,
>> > > misbehaving member is restarted correctly with the new topology.
>> > >
>> > > The original suggestion in the KIP discussion to solve these issues
>> > > (for now) was to exclude topology updates from the KIP. However, it
>> > > was confirmed by several people (such as Anna) that topology updates
>> > > are quite common in the wild, so for having a production-ready
>> > > implementation of the new protocol, a mechanism for topology updates
>> > > is required. It is not clear how strict the requirement for rolling
>> > > updates in the first version of the protocol. No ‘better’ alternative
>> > > for updating topologies was suggested.
>> > >
>> > > Proposal 1) Making topology updating optional: We modify the heartbeat
>> > > request to allow joining a group without sending a topology (only a
>> > > topology ID). When joining a group this way, the member is explicitly
>> > > requesting to not update the topology of the group. In other words,
>> > > leaving the topology as null, will mean that no topology update will
>> > > be performed on the broker side. If the topology ID does not match the
>> > > topology ID of the group, and no topology update is performed, and the
>> > > member that joined will just behave like any other member with an
>> > > inconsistent topology ID. That is, it will not get any tasks assigned.
>> > > Streams clients will never send the topology when they know they are
>> > > just rejoining the group, e.g. because the client was fenced and
>> > > dropped out of the group. That means, a topology update is only
>> > > initiated when a streams application is first started up. This will
>> > > prevent "accidental" topology downgrades if we know that we are
>> > > rejoining the group. However, if we rejoin the group without knowing
>> > > that we are just rejoining (e.g. since the whole pod was restarted),
>> > > we could still run into temporary topology downgrades.
>> > >
>> > > Proposal 2) Disable topology updates by default on the client:
>> > > Second, we can disable topology updates by default from the client
>> > > side, by extending the streams configuration with a config
>> > > enable_topology_updates which defaults to false and extending the
>> > > heartbeat RPC by a flag UpdateTopology which propagates that config
>> > > value to the broker. The advantage of having a client-side config is,
>> > > that, during a roll, I can set the configuration only for updated
>> > > clients, which can ensure that no accidental rollbacks happen. The
>> > > downside is that after the roll, I would have to do a second roll to
>> > > disable enable_topology_updates again, so that we are safe during the
>> > > next roll again. However, avoiding the second roll is more of a
>> > > usability optimization, which is conceptually more complex and could
>> > > address in a follow-up KIP, as in proposal 3.
>> > >
>> > > Proposal 3) Fixing the topology ID during upgrades (follow-up KIP): To
>> > > fully prevent accidental downgrades, one could introduce a broker-side
>> > > group-level config that restricts the possible topology IDs that a
>> > > client can upgrade to. This would be quite similar in safety as making
>> > > topology updates manual. For updating a topology, the user would have
>> > > to first, extract the topology ID from the update topology - this
>> > > could e.g. be logged when the application is started, second, set the
>> > > group-level config using an admin client (something like
>> > > group.streams.allowed_upgrade_topology_id), third, roll the
>> > > application. This would require quite some extra tooling (how to get
>> > > the topology ID), so I’d propose introducing this in a follow-up KIP.
>> > >
>> > > Let me know if there are any concerns with this approach.
>> > >
>> > > Cheers,
>> > > Lucas
>> > >
>> > > On Mon, Oct 7, 2024 at 9:42 AM Bruno Cadonna <cado...@apache.org>
>> wrote:
>> > >>
>> > >> Hi all,
>> > >>
>> > >> we did some major changes to this KIP that we would like the
>> community
>> > >> to review. The changes are the following:
>> > >>
>> > >> 1. We merged the Streams group initialize RPC into the Streams group
>> > >> heartbeat RPC. We decided to merge them because it simplifies the
>> > >> sychronization between initialization and heartbeat during group
>> > >> creation and migration from a classic group. A consequence of the
>> merge
>> > >> is that the heartbeat now needs permissions to create and describe
>> > >> topics which before only the initialize call had. But we think that
>> is
>> > >> not a big deal. A consumer of a Streams client will send the
>> metadata of
>> > >> its topology in its first heartbeat when it joins the group. After
>> that
>> > >> it will not send the topology metadata anymore.
>> > >>
>> > >> 2. We discovered that to create the internal topics, the topology
>> > >> metadata also need to include the co-partition groups since we cannot
>> > >> infer the co-partitioning solely from the sub-topologies and the
>> input
>> > >> topics of the sub-topologies.
>> > >>
>> > >> 3. We added a section that describes how the migration from a classic
>> > >> group to a streams group and back works. This is very similar to how
>> the
>> > >> migration in KIP-848 works.
>> > >>
>> > >>
>> > >> Please give us feedback about the changes and the KIP in general.
>> > >>
>> > >> We would like to start a vote on this KIP as soon as possible.
>> > >>
>> > >> Best,
>> > >> Bruno
>> > >>
>> > >> On 11.09.24 01:06, Sophie Blee-Goldman wrote:
>> > >>> Just following up to say thank you Lucas for the detailed
>> > explanations, and
>> > >>> especially the thorough response to my more "existential" questions
>> > about
>> > >>> building off 848 vs introducing a separate Streams group protocol.
>> This
>> > >>> really helped me understand the motivations behind this decision. No
>> > notes!
>> > >>>
>> > >>> On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy
>> > >>> <lbruts...@confluent.io.invalid> wrote:
>> > >>>
>> > >>>> Hi Sophie,
>> > >>>>
>> > >>>> thanks for getting deeply involved in this discussion.
>> > >>>>
>> > >>>> S16. Adding tagged fields would not require an RPC version bump at
>> > >>>> all. This should already a cover a lot of use cases where
>> requestion
>> > >>>> versioning wouldn't become an issue, as long as it is safe to
>> ignore
>> > >>>> the new fields. Other than that, I'm not aware of a formal
>> definition
>> > >>>> of the what request version bumps mean in relation to Apache Kafka
>> > >>>> versions - if anybody knows, please let me know. In my
>> understanding,
>> > >>>> the Kafka protocol evolves independently of AK versions, and for
>> the
>> > >>>> Kafka protocol to change, you require an accepted KIP, and not an
>> AK
>> > >>>> release. The grey area is that a protocol change without an AK
>> release
>> > >>>> does not mean much, because even if cloud vendors deploy it before
>> an
>> > >>>> AK release, virtually no clients would support it and the protocol
>> > >>>> would fall back to the earlier RPC version. So maybe this isn't
>> that
>> > >>>> clearly formalized because it does not matter much in practice.
>> > >>>> Normally, I would expect an RPC version bump after each KIP that
>> > >>>> breaks RPC compatibility. But there seems to be some flexibility
>> here
>> > >>>> - for example, the KIP-848 RPCs were changed after the fact in
>> trunk,
>> > >>>> because they were not exposed yet by default in AK (or any other
>> > >>>> implementation of the Kafka protocol), so a breaking change without
>> > >>>> request version bump was probably not deemed problematic.
>> > >>>>
>> > >>>> S17/18. We can add an informational table for the interface to the
>> > >>>> assignor, and that should also make clear how a client-side
>> assignment
>> > >>>> interface would look. Generally, our idea would be to send all the
>> > >>>> input information for the task assignor to a client and expect all
>> the
>> > >>>> output information of a task assignor in return, very similar to
>> > >>>> KIP-848. We did not include such a table because we do not define a
>> > >>>> public interface to define custom broker-side assignors in this
>> KIP.
>> > >>>> But generally, the input / output of the (task) assignor mostly
>> > >>>> follows the data fields sent to / from the brokers. Let me do that,
>> > >>>> but first I'll respond to the rest of your questions that seem more
>> > >>>> important.
>> > >>>>
>> > >>>> S19. I think an explicit trigger for rebalance could make sense,
>> but
>> > >>>> let me explain the thinking why we haven't included this in the
>> > >>>> current KIP: We don't want to include it for client-side assignment
>> > >>>> obviously, because this KIP does not include client-side
>> assignment.
>> > >>>> For triggering a rebalance when a warm-up has caught up, this is
>> > >>>> treated differently in the KIP: the task assignor is called when a
>> > >>>> client sends updated task offsets. The client does not update its
>> task
>> > >>>> offsets regularly, but according to an interval
>> > >>>> `task.offset.interval.ms`, but also immediately when a client
>> notices
>> > >>>> that a warm-up has caught up. So effectively, it is mostly the
>> client
>> > >>>> that is constantly checking the lag. I absolutely believe that the
>> > >>>> client should be checking for `acceptable.recovery.lag`, and we are
>> > >>>> using the presence of the task offset field to trigger
>> reassignment.
>> > >>>> What we weren't sure is how often we need the task offsets to be
>> > >>>> updated and how often we want to retrigger the assignment because
>> of
>> > >>>> that. An explicit field for "forcing" a reassignment may still make
>> > >>>> sense, so that we can update the task offsets on the broker more
>> often
>> > >>>> than we call the assignor. However, I'm not convinced that this
>> should
>> > >>>> just be a generic "rebalance trigger", as this seems to be going
>> > >>>> against the goals of KIP-848. Generally, I think one of the
>> principles
>> > >>>> of KIP-848 is that a member just describes its current state and
>> > >>>> metadata, and the group coordinator makes a decision on whether
>> > >>>> reassignment needs to be triggered. So it would seem more natural
>> to
>> > >>>> just indicate a boolean "I have caught-up warm-up tasks" to the
>> group
>> > >>>> coordinator. I understand that for client-side assignment, you may
>> > >>>> want to trigger assignment explicitely without the group
>> coordinator
>> > >>>> understanding why, but then we should add such a field as part of
>> the
>> > >>>> client-side assignment KIP.
>> > >>>>
>> > >>>> S20. Yes, inside the group coordinator, the main differences are
>> > >>>> topology initialization handling, and the semantics of task
>> assignment
>> > >>>> vs. partition assignment. It will not be a completely separate
>> group
>> > >>>> coordinator, and we will not change things like the offset reading
>> and
>> > >>>> writing. What will have its own code path is the handling of
>> > >>>> heartbeats and the reconciliation of the group (so, which tasks are
>> > >>>> sent to which member at which point in time until we reach the
>> > >>>> declarative target assignment that was produced by a client-side or
>> > >>>> server-side assignor). Also, the assignor implementations will
>> > >>>> obviously be completely separate from the consumer group assignors,
>> > >>>> but that is already the case currently. Streams groups will
>> co-exist
>> > >>>> in the group coordinator with share groups (for queues), classic
>> > >>>> groups (old generic protocol for consumers and connect) and
>> consumer
>> > >>>> groups (from KIP-848), and potentially a new group type for
>> connect as
>> > >>>> well. Each of these group types have a separate code path for heart
>> > >>>> beating, reconciling assignments, keeping track of the internal
>> state
>> > >>>> of the group and responding to specific requests from admin API,
>> like
>> > >>>> ConsumerGroupDescribe. For streams, there will be a certain amount
>> of
>> > >>>> duplication from consumer groups - for example, liveness tracking
>> for
>> > >>>> members (setting timers for the session timeout etc.) is
>> essentially
>> > >>>> the same between both groups, and the reconciliation logic is quite
>> > >>>> similar. We will make an effort to refactor as much of this code
>> in a
>> > >>>> way that it can be reused by both group types, and a lot of things
>> > >>>> have already been made generic with the introduction of share
>> groups
>> > >>>> in the group coordinator.
>> > >>>>
>> > >>>> S21. It's nice that we are discussing "why are we doing this" in
>> > >>>> question 21 :D. I think we are touching upon the reasoning in the
>> KIP,
>> > >>>> but let me explain the reasoning in more detail. You are right, the
>> > >>>> reason we want to implement custom RPCs for Kafka Streams is not
>> about
>> > >>>> broker-side vs. client-side assignment. The broker could implement
>> the
>> > >>>> Member Metadata versioning scheme of Kafka Streams, intercept
>> consumer
>> > >>>> group heartbeats that look like Kafka Streams heartbeats,
>> deserialize
>> > >>>> the custom metadata and run a broker-side assignor for Kafka
>> Streams,
>> > >>>> and finally serialize custom Kafka Streams assignment metadata, all
>> > >>>> based on KIP-848 RPCs.
>> > >>>>
>> > >>>> First of all, the main difference we see with custom RPCs is really
>> > >>>> that streams groups become a first-level concept for both the
>> broker
>> > >>>> and clients of the Kafka. This also goes into the answer for S20
>> (what
>> > >>>> is different beside topology initialization and task
>> reconciliation).
>> > >>>> With streams groups, I can use the Admin API call to see the
>> current
>> > >>>> assignment, the target assignment, all the metadata necessary for
>> > >>>> assignment, and some information about the current topology in a
>> > >>>> human-readable and machine-readable form, instead of having to
>> parse
>> > >>>> it from logs on some client machine that may or may not have the
>> right
>> > >>>> log-level configured. This will help humans to understand what is
>> > >>>> going on, but can also power automated tools / CLIs / UIs, and I
>> think
>> > >>>> this is a major improvement for operating a streams application.
>> > >>>> Again, this is not about who is in control of the broker or the
>> > >>>> clients, the information becomes accessible for everybody who can
>> call
>> > >>>> the Admin API. It also does not depend on the cloud vendor, it
>> will be
>> > >>>> an improvement available for pure AK users, and will also work with
>> > >>>> client-side assignment (although obviously, if there is an
>> intention
>> > >>>> to start encoding proprietary data in black-box byte-arrays, this
>> > >>>> information remains hidden to users of the open source distribution
>> > >>>> again).
>> > >>>>
>> > >>>> I also think that the original proposal of reusing the new consumer
>> > >>>> group protocol for Kafka Streams has some downsides, that, while
>> not
>> > >>>> being deal-breakers, would make the new behaviour of Kafka Streams
>> > >>>> less than ideal. One example is the reconciliation: if we have a
>> > >>>> custom stream partition assignor (client- or broker-side doesn't
>> > >>>> matter), we can only control the target assignment that the group
>> > >>>> converges to, but none of the intermediate states that the group
>> > >>>> transitions through while attempting to reach the desired end
>> state.
>> > >>>> For example, there is no way to prevent the reconciliation loop of
>> > >>>> KIP-848 from giving an active task to processX before another
>> instance
>> > >>>> on processX has revoked the corresponding standby task. There is no
>> > >>>> synchronization point where these movements are done at the same
>> time
>> > >>>> as in the classic protocol, and on the broker we can't prevent
>> this,
>> > >>>> because the reconciliation loop in KIP-848 does not know what a
>> > >>>> standby task is, or what a process ID is. You could argue, you can
>> let
>> > >>>> the application run into LockExceptions in the meantime, but it
>> will
>> > >>>> be one of the things that will make Kafka Streams work less than
>> > >>>> ideal. Another example is task offsets. It is beneficial for
>> streams
>> > >>>> partition assignors to have relatively recent task offset
>> information,
>> > >>>> which, in KIP-848, needs to be sent to the assignor using a
>> black-box
>> > >>>> MemberMetadata byte array. This member metadata byte array is
>> > >>>> persisted to the consumer offset topic every time it changes (for
>> > >>>> failover). So we end up with a bad trade-off: if we send the task
>> > >>>> offset information to the broker very frequently, we will have to
>> > >>>> write those member metadata records very frequently as well. If we
>> do
>> > >>>> not send them frequently, streams task assignors will have very
>> > >>>> outdated lag information. In KIP-1071 we can make the decision to
>> not
>> > >>>> persist the task offset information, which avoids this trade-off.
>> It
>> > >>>> gives us up-to-date lag information in the usual case, but after
>> > >>>> fail-over of the group coordinator, our lag information may be
>> missing
>> > >>>> for a short period of time. These are two examples, but I think the
>> > >>>> general theme is, that with custom RPCs, Kafka Streams is in
>> control
>> > >>>> of how the group metadata is processed and how the tasks are
>> > >>>> reconciled. This is not just a consideration for now, but also the
>> > >>>> future: will KIP-848 reconciliation and metadata management always
>> > >>>> work "good enough" for Kafka Streams, or will there be use cases
>> where
>> > >>>> we want to evolve and customize the behaviour of Kafka Streams
>> groups
>> > >>>> independently of consumer groups.
>> > >>>>
>> > >>>> A final point that I would like to make, I don't think that custom
>> > >>>> streams RPCs are inherently a lot harder to do for the Kafka
>> Streams
>> > >>>> community than "just utilizing" KIP-848, at least not as much as
>> you
>> > >>>> would think. Many things in KIP-848, such customizable assignment
>> > >>>> metadata with a metadata versioning scheme are made to look like
>> > >>>> general-purpose mechanisms, but are effectively mechanisms for
>> Kafka
>> > >>>> Streams and also completely unimplemented at this point in any
>> > >>>> implementation of the Kafka protocol. We know that there is very
>> > >>>> little use of custom partition assignors in Kafka besides Kafka
>> > >>>> Streams, and we can expect the same to be true of these new
>> > >>>> "customizable" features of KIP-848. So, besides the Kafka Streams
>> > >>>> community, there is no one driving the implementation of these
>> > >>>> features.  "Just utilizing" KIP-848 for Kafka Streams would
>> > >>>> effectively mean to implement all this, and still make a lot of
>> deep
>> > >>>> changes to Kafka Streams (as mechanisms like probing rebalances
>> etc.
>> > >>>> don't make sense in KIP-848). I think adapting KIP-848 to work with
>> > >>>> Kafka Streams either way is a big effort. Piggybacking on consumer
>> > >>>> group RPCs and extending the consumer group protocol to the point
>> that
>> > >>>> it can carry Kafka Streams on its back would not make things
>> > >>>> significantly easier. Some things, like online migration from the
>> old
>> > >>>> protocol to the new protocol or reimplementing metadata versioning
>> on
>> > >>>> top of KIP-848, may even be harder in KIP-848, than in KIP-1071. I
>> > >>>> don't think we can consider consumer group RPCs as a shortcut to
>> > >>>> revamp the streams rebalance protocol, but should be focussing on
>> what
>> > >>>> makes Kafka Streams work best down the road.
>> > >>>>
>> > >>>> Hope that makes sense!
>> > >>>>
>> > >>>> Cheers,
>> > >>>> Lucas
>> > >>>>
>> > >>>> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman
>> > >>>> <sop...@responsive.dev> wrote:
>> > >>>>>
>> > >>>>> Thanks for another detailed response Lucas! Especially w.r.t how
>> the
>> > >>>> epochs
>> > >>>>> are defined. I also went back and re-read KIP-848 to refresh
>> myself,
>> > I
>> > >>>>> guess it wasn't clear to me how much of the next-gen consumer
>> > protocol we
>> > >>>>> are reusing vs what's being built from the ground up.
>> > >>>>>
>> > >>>>> S16.
>> > >>>>>
>> > >>>>>>    It will become interesting when we want to include extra data
>> > after
>> > >>>> the
>> > >>>>>> initial protocol definition.
>> > >>>>>
>> > >>>>> I guess that's what I'm asking us to define -- how do we evolve
>> the
>> > >>>> schema
>> > >>>>> when expanding the protocol? I think we need to set a few public
>> > >>>> contracts
>> > >>>>> here, such as whether/which versions get bumped when. For example
>> if
>> > I
>> > >>>>> add/modify fields twice within a single Kafka release, do we still
>> > need
>> > >>>> to
>> > >>>>> update the version? I think the answer is "yes" since I know you
>> > >>>> Confluent
>> > >>>>> folks upgrade brokers more often than AK releases, but this is
>> > different
>> > >>>>> from how eg the SubscriptionInfo/AssignmentInfo of the
>> > >>>>> StreamsPartitionAssignor works today, hence I want to put all
>> that in
>> > >>>>> writing (for those of us who aren't familiar with how all this
>> works
>> > >>>>> already and may be modifying it in the future, say, to add
>> > client-side
>> > >>>>> assignment :P)
>> > >>>>>
>> > >>>>> Maybe a quick example of what to do to evolve this schema is
>> > sufficient?
>> > >>>>>
>> > >>>>> S17.
>> > >>>>> The KIP seems to throw the assignor, group coordinator, topic
>> > >>>>> initialization, and topology versioning all into a single black
>> box
>> > on
>> > >>>> the
>> > >>>>> broker. I understand much of this is intentionally being glossed
>> > over as
>> > >>>> an
>> > >>>>> implementation detail since it's a KIP, but it would really help
>> to
>> > tease
>> > >>>>> apart these distinct players and define their interactions. For
>> > example,
>> > >>>>> what are the inputs and outputs to the assignor? For example
>> could we
>> > >>>> get a
>> > >>>>> table like the "Data Model
>> > >>>>> <
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel
>> > >>>>> "
>> > >>>>> in 848?
>> > >>>>>
>> > >>>>> KIP-848 goes into detail on these things, but again, it's unclear
>> > what
>> > >>>>> parts of 848 are and aren't supposed to be carried through. For
>> > example
>> > >>>> 848
>> > >>>>> has a concept of assignor "reasons" and client-side assignors and
>> > >>>> assignor
>> > >>>>> metadata which can all trigger a rebalance/group epoch bump.
>> However
>> > in
>> > >>>> the
>> > >>>>> current plan for 1071 none of these exist. Which leads me to my
>> next
>> > >>>>> point...
>> > >>>>>
>> > >>>>> S18.
>> > >>>>> Many people use a managed Kafka service like Confluent Cloud where
>> > >>>> brokers
>> > >>>>> are on the bleeding edge but clients are upgraded less frequently.
>> > >>>> However,
>> > >>>>> the opposite is also true for a great many users: client
>> > applications are
>> > >>>>> easy to upgrade and bumped often to get new features, whereas the
>> > brokers
>> > >>>>> (often run by a different team altogether) are considered riskier
>> to
>> > >>>>> upgrade and stay on one version for a long time. We can argue
>> about
>> > which
>> > >>>>> case is more common -- in my personal experience it's actually the
>> > >>>> latter,
>> > >>>>> although I only realized this after my time at Confluent which was
>> > >>>>> obviously skewed towards meeting Confluent Cloud users -- but
>> > regardless,
>> > >>>>> we obviously need to support both cases as first-class citizens.
>> And
>> > I'm
>> > >>>> a
>> > >>>>> bit concerned that there's not enough in the KIP to ensure that
>> > >>>> client-side
>> > >>>>> assignment won't end up getting hacked into the protocol as an
>> > >>>> afterthought.
>> > >>>>>
>> > >>>>> Don't get me wrong, I'm still not demanding you guys implement
>> > >>>> client-side
>> > >>>>> assignors as part of this KIP. I'm happy to leave the
>> implementation
>> > of
>> > >>>>> that to a followup, but I feel we need to flesh out the basic
>> > direction
>> > >>>> for
>> > >>>>> this right now, as part of the v0 Streams protocol.
>> > >>>>>
>> > >>>>> I'm also not asking you to do this alone, and am happy to work
>> > with/for
>> > >>>> you
>> > >>>>> on this. I have one possible proposal sketched out already,
>> although
>> > >>>> before
>> > >>>>> I share that I just wanted to make sure we're aligned on the
>> > fundamentals
>> > >>>>> and have mapped out the protocol in full first (ie S17). Just
>> want to
>> > >>>> have
>> > >>>>> a rough plan for how clients-side assignment will fit into the
>> > picture,
>> > >>>> and
>> > >>>>> right now it's difficult to define because it's unclear where the
>> > >>>>> boundaries between the group coordinator and broker-side assignor
>> > are (eg
>> > >>>>> what are assignor inputs and outputs).
>> > >>>>>
>> > >>>>> I'm committed to ensuring this doesn't add extra work or delay to
>> > this
>> > >>>> KIP
>> > >>>>> for you guys.
>> > >>>>>
>> > >>>>> S19.
>> > >>>>> I think it could make sense to include a concept like the
>> "assignor
>> > >>>>> reason"  from 848  as a top-level field and rebalance trigger (ie
>> > group
>> > >>>>> epoch bump). This would mainly be for the client-side assignor to
>> > >>>> trigger a
>> > >>>>> rebalance, but for another example, it seems like we want the
>> > brokers to
>> > >>>> be
>> > >>>>> constantly monitoring warmup tasks and processing lag updates to
>> know
>> > >>>> when
>> > >>>>> a warmup task is to be switched, which is heavy work. Why not just
>> > let
>> > >>>> the
>> > >>>>> client who owns the warmup and knows the lag decide when it has
>> > warmed up
>> > >>>>> and signal the broker? It's a small thing for a client to check
>> when
>> > its
>> > >>>>> task gets within the acceptable recovery lag and notify via the
>> > heartbeat
>> > >>>>> if so, but it seems like a rather big deal for the broker to be
>> > >>>> constantly
>> > >>>>> computing these checks for every task on every hb. Just food for
>> > thought,
>> > >>>>> I  don't really have a strong preference over where the warmup
>> > decision
>> > >>>>> happens -- simply pointing it out that it fits very neatly into
>> the
>> > >>>>> "assignor reason"/"rebalance requested" framework. Making this a
>> > >>>>> client-side decision also makes it easier to evolve and customize
>> > since
>> > >>>>> Kafka Streams users tend to have more control over
>> > upgrading/implementing
>> > >>>>> stuff in their client application vs the brokers (which is often a
>> > >>>>> different team altogether)
>> > >>>>>
>> > >>>>> S20.
>> > >>>>> To what extent do we plan to reuse implementations of 848 in
>> order to
>> > >>>> build
>> > >>>>> the new Streams group protocol? I'm still having some trouble
>> > >>>> understanding
>> > >>>>> where we are just adding stuff on top and where we plan to rebuild
>> > from
>> > >>>> the
>> > >>>>> ground up. For example we seem to share most of the epoch
>> > reconciliation
>> > >>>>> stuff but will have different triggers for the group epoch bump.
>> Is
>> > it
>> > >>>>> possible to just "plug in" the group epoch and let the consumer's
>> > group
>> > >>>>> coordinator figure stuff out from there? Will we have a completely
>> > >>>> distinct
>> > >>>>> Streams coordinator? I know this is an implementation detail but
>> imo
>> > it's
>> > >>>>> ok to get in the weeds a bit for a large and complex KIP such as
>> > this.
>> > >>>>>
>> > >>>>> Setting aside the topology initialization stuff, is the main
>> > difference
>> > >>>>> really just in how the group coordinator will be aware of tasks
>> for
>> > >>>> Streams
>> > >>>>> groups vs partitions for consumer groups?
>> > >>>>>
>> > >>>>> Which brings me to a high level question I feel I have to ask:
>> > >>>>>
>> > >>>>> S21.
>> > >>>>> Can you go into more detail on why we have to provide Kafka
>> Streams
>> > its
>> > >>>> own
>> > >>>>> RPCs as opposed to just utilizing the protocol from KIP-848? The
>> > Rejected
>> > >>>>> Alternatives does address this but the main argument there seems
>> to
>> > be in
>> > >>>>> implementing a broker-side assignor. But can't you do this with
>> 848?
>> > And
>> > >>>>> given there are users who rarely upgrade their brokers just as
>> there
>> > are
>> > >>>>> users who rarely upgrade their clients, it seems silly to do all
>> this
>> > >>>> work
>> > >>>>> and change the fundamental direction of Kafka Streams just to give
>> > more
>> > >>>>> control over the assignment to the brokers.
>> > >>>>>
>> > >>>>> For a specific example, the KIP says "Client-side assignment
>> makes it
>> > >>>> hard
>> > >>>>> to debug and tune the assignment logic" -- personally, I would
>> find
>> > it
>> > >>>>> infinitely more difficult to tune and debug assignment logic if it
>> > were
>> > >>>>> happening on the broker. But that's just because I work with the
>> app
>> > >>>> devs,
>> > >>>>> not the cluster operators. The point is we shouldn't design an
>> entire
>> > >>>>> protocol to give preference to one vendor or another -- nobody
>> wants
>> > the
>> > >>>>> ASF to start yelling at us  lol D:
>> > >>>>>
>> > >>>>> Another example, which imo is a good one (IIUC) is that "a simple
>> > >>>> parameter
>> > >>>>> change requires redeploying of all streams clients". That's
>> > definitely a
>> > >>>>> huge bummer, but again, why would broker-side assignors not be
>> able
>> > to
>> > >>>>> trigger a reassignment based on a custom config in the 848
>> protocol?
>> > We
>> > >>>>> could still have the ha and sticky assignors implemented on the
>> > brokers,
>> > >>>>> and could still define all the Streams configs as broker configs
>> to
>> > be
>> > >>>>> passed into the streams custom broker assignors. That feels like a
>> > lot
>> > >>>> less
>> > >>>>> work than redoing our own group protocol from scratch?
>> > >>>>>
>> > >>>>> I don't want you guys to think I'm trying to block the entire
>> > direction
>> > >>>> of
>> > >>>>> this KIP. I'd just like to better understand your thoughts on this
>> > since
>> > >>>> I
>> > >>>>> assume you've discussed this at length internally. Just help me
>> > >>>> understand
>> > >>>>> why this huge proposal is justified (for my own sake and to
>> convince
>> > any
>> > >>>>> "others" who might be skeptical)
>> > >>>>>
>> > >>>>>
>> > >>>>> On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org>
>> > wrote:
>> > >>>>>
>> > >>>>>> I still need to catch up on the discussion in general. But as
>> > promised,
>> > >>>>>> I just started KIP-1088 about the `KafkaClientSupplier` question.
>> > >>>>>>
>> > >>>>>> Looking forward to your feedback on the new KIP. I hope we can
>> get
>> > >>>>>> KIP-1088 done soon, to not block this KIP.
>> > >>>>>>
>> > >>>>>>
>> > >>>>>> -Matthias
>> > >>>>>>
>> > >>>>>> On 9/4/24 09:52, Lucas Brutschy wrote:
>> > >>>>>>> Hi Sophie,
>> > >>>>>>>
>> > >>>>>>> thanks for the questions and comments!
>> > >>>>>>>
>> > >>>>>>> S1. KIP-1071 does not need to add public interfaces to streams,
>> but
>> > >>>> it
>> > >>>>>>> does not (yet) make the streams protocol client pluggable.
>> Matthias
>> > >>>>>>> promised to write a KIP that will propose a solution for the
>> > >>>>>>> KafkaClientSupplier soon. He suggested keeping it separate from
>> > this
>> > >>>>>>> KIP, but we can treat it as a blocker. You are right that we
>> could
>> > >>>> try
>> > >>>>>>> fitting an explicit topic initialization in this interface, if
>> we
>> > >>>>>>> decide we absolutely need it now, although I'm not sure if that
>> > would
>> > >>>>>>> actually define a useful workflow for the users to explicitly
>> > update
>> > >>>> a
>> > >>>>>>> topology. Explicit initialization cannot be part of admin tools,
>> > >>>>>>> because we do not have the topology there. It could probably be
>> a
>> > >>>>>>> config that defaults to off, or a method on the KafkaStreams
>> > object?
>> > >>>>>>> To me, explicit initialization opens a broad discussion on how
>> to
>> > do
>> > >>>>>>> it, so I would like to avoid including it in this KIP. I guess
>> it
>> > >>>> also
>> > >>>>>>> depends on the outcome of the discussion in S8-S10.
>> > >>>>>>>
>> > >>>>>>> S2. It's hard to define the perfect default, and we should
>> probably
>> > >>>>>>> aim for safe operations first - since people can always up the
>> > limit.
>> > >>>>>>> I changed it to 20.
>> > >>>>>>>
>> > >>>>>>> S11. I was going to write we can add it, but it's actually quite
>> > >>>>>>> independent of KIP-1071 and also a nice newbie ticket, so I
>> would
>> > >>>>>>> propose we treat it as out-of-scope and let somebody define it
>> in a
>> > >>>>>>> separate KIP - any discussion about it would anyway be lost in
>> this
>> > >>>>>>> long discussion thread.
>> > >>>>>>>
>> > >>>>>>> S12. Member epoch behaves the same as in KIP-848. See the
>> KIP-848
>> > >>>>>>> explanation for basics:
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup
>> > >>>>>>>
>> > >>>>>>> Together with the section "Reconciliation of the group", this
>> > should
>> > >>>>>>> make clear how member epochs, group epochs and assignment epochs
>> > work
>> > >>>>>>> in KIP-1071, but I'll try to summarize the reconciliation a bit
>> > more
>> > >>>>>>> explicitly below:
>> > >>>>>>>
>> > >>>>>>> Essentially, the broker drives a reconciliation for each member
>> > >>>>>>> separately. The goal is to reconcile the assignment generated by
>> > the
>> > >>>>>>> assignor and reach the assignment epoch, but it's an
>> asynchronous
>> > >>>>>>> process, so each member may still be on an older epoch, which is
>> > the
>> > >>>>>>> member epoch. We do not reinvent any of the internal machinery
>> for
>> > >>>>>>> streams here, we are just adding standby tasks and warm-up
>> tasks to
>> > >>>>>>> the reconciliation of KIP-848.
>> > >>>>>>>
>> > >>>>>>> During reconciliation, the broker first asks the member to
>> revoke
>> > all
>> > >>>>>>> tasks that are not in its target assignment, by removing it from
>> > the
>> > >>>>>>> current assignment for the member (which is provided in the
>> > heartbeat
>> > >>>>>>> response). Until revocation is completed, the member remains on
>> the
>> > >>>>>>> old member epoch and in `UNREVOKED` state. Only once the member
>> > >>>>>>> confirms revocation of these tasks by removing them from the
>> set of
>> > >>>>>>> owned tasks in the heartbeat request, the member can transition
>> to
>> > >>>> the
>> > >>>>>>> current assignment epoch and get new tasks assigned. Once the
>> > member
>> > >>>>>>> has revoked all tasks that are not in its target assignment, it
>> > >>>>>>> transitions to the new epoch and gets new tasks assigned. There
>> are
>> > >>>>>>> some tasks that may be in the (declarative) target assignment of
>> > the
>> > >>>>>>> member generated by the assignor, but that may not be
>> immediately
>> > >>>>>>> added to the current assignment of the member, because they are
>> > still
>> > >>>>>>> "unreleased". Unreleased tasks are:
>> > >>>>>>>
>> > >>>>>>> - A. For any task, another instance of the same process may own
>> the
>> > >>>>>>> task in any role (active/standby/warm-up), and we are still
>> > awaiting
>> > >>>>>>> revocation.
>> > >>>>>>>
>> > >>>>>>> - B. For an active task, an instance of any process may own the
>> > same
>> > >>>>>>> active task, and we are still awaiting revocation.
>> > >>>>>>>
>> > >>>>>>> As long as there is at least one unreleased task, the member
>> > remains
>> > >>>>>>> in state UNRELEASED. Once all tasks of the target assignment are
>> > >>>> added
>> > >>>>>>> to the current assignment of the member (because they were
>> > released),
>> > >>>>>>> the member transitions to STABLE.
>> > >>>>>>>
>> > >>>>>>> Technically, the first constraint (we cannot assign a task in
>> two
>> > >>>>>>> different roles to the same process) is only relevant for
>> stateful
>> > >>>>>>> tasks that use the local state directory. I wonder if we should
>> > relax
>> > >>>>>>> this restriction slightly, by marking sub-topologies that access
>> > the
>> > >>>>>>> local state directory. This information could also be used by
>> the
>> > >>>>>>> assignor. But we can also introduce as a follow-up improvement.
>> > >>>>>>>
>> > >>>>>>> S13i. You are right, these bumps of the group epoch are "on
>> top" of
>> > >>>>>>> the KIP-848 reasons for bumping the group epoch. I added the
>> > KIP-848
>> > >>>>>>> reasons as well, because this was a bit confusing in the KIP.
>> > >>>>>>>
>> > >>>>>>> S13ii. The group epoch is bumped immediately once a warm-up task
>> > has
>> > >>>>>>> caught up, since this is what triggers running the assignor.
>> > However,
>> > >>>>>>> the assignment epoch is only bumped once the task assignor has
>> run.
>> > >>>> If
>> > >>>>>>> the task assignment for a member hasn't changed, it will
>> transition
>> > >>>> to
>> > >>>>>>> the new assignment epoch immediately.
>> > >>>>>>>
>> > >>>>>>> S13iii. In the case of a rolling bounce, the group epoch will be
>> > >>>>>>> bumped for every member that leaves and joins the group. For
>> > updating
>> > >>>>>>> rack ID, client tags, topology ID etc. -- this will mostly play
>> a
>> > >>>> role
>> > >>>>>>> for static membership, as you mentioned, but would cause the
>> same
>> > >>>>>>> thing - in a rolling bounce with static membership, we will
>> attempt
>> > >>>> to
>> > >>>>>>> recompute the assignment after every bounce, if the topology ID
>> is
>> > >>>>>>> changed in the members. This can cause broker load, as you
>> > mentioned.
>> > >>>>>>> Note, however, that we don't have to compute an assignment for
>> > every
>> > >>>>>>> group epoch bump. While KIP-848 computes its assignment as part
>> of
>> > >>>> the
>> > >>>>>>> main loop of the group coordinator, it uses asynchronous
>> assignment
>> > >>>> in
>> > >>>>>>> the case of client-side assignment, where the assignment is
>> slower.
>> > >>>> So
>> > >>>>>>> it will kick off the assignment on the client side, enter state
>> > >>>>>>> `ASSIGNING`, but may have more group epoch bumps in the meantime
>> > >>>> while
>> > >>>>>>> the assignment is computed on the client side. We leave it open
>> to
>> > >>>> use
>> > >>>>>>> the same mechanism on the broker to compute assignment
>> > asynchronously
>> > >>>>>>> on a separate thread, in case streams assignment is too slow for
>> > the
>> > >>>>>>> main loop of the group coordinator. We will also consider
>> > >>>>>>> rate-limiting the assignor to reduce broker load. However, it's
>> too
>> > >>>>>>> early to define this in detail, as we don't know much about the
>> > >>>>>>> performance impact yet.
>> > >>>>>>>
>> > >>>>>>> S13iv. The heartbeat just sends "null" instead of the actual
>> value
>> > in
>> > >>>>>>> case the value didn't change since the last heartbeat, so no
>> > >>>>>>> comparison is required. This is defined in the heartbeat RPC and
>> > the
>> > >>>>>>> same as in KIP-848.
>> > >>>>>>>
>> > >>>>>>> S13v. I think I disagree. The group epoch represents a version
>> of
>> > the
>> > >>>>>>> group state, including the current topology and the metadata of
>> all
>> > >>>>>>> members -- the whole input to the task assignor -- and defines
>> > >>>> exactly
>> > >>>>>>> the condition under which we attempt to recompute the
>> assignment.
>> > So
>> > >>>>>>> it's not overloaded, and has a clear definition. It's also
>> exactly
>> > >>>> the
>> > >>>>>>> same as in KIP-848, and we do not want to move away too far from
>> > that
>> > >>>>>>> protocol, unless it's something that is specific to streams that
>> > need
>> > >>>>>>> to be changed. Also, the protocol already uses three types of
>> > epochs
>> > >>>> -
>> > >>>>>>> member epoch, assignment epoch, group epoch. I don't think
>> adding
>> > >>>> more
>> > >>>>>>> epochs will make things clearer. Unless we have a strong reason
>> why
>> > >>>> we
>> > >>>>>>> need to add extra epochs, I'm not in favor.
>> > >>>>>>>
>> > >>>>>>> S14. Assignment epoch is the same as in KIP-848:
>> > >>>>>>>
>> > >>>>>>>
>> > >>>>>>
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment
>> > >>>>>>>
>> > >>>>>>> Its definition does not change at all, so we didn't describe it
>> in
>> > >>>>>>> detail in KIP-1071.
>> > >>>>>>>
>> > >>>>>>> S15. I agree that the name is a bit generic, but the time to
>> change
>> > >>>>>>> this was when `group.instance.id` was introduced. Also, if we
>> > start
>> > >>>>>>> diverging from regular consumers / consumer groups in terms of
>> > >>>> naming,
>> > >>>>>>> it will not make things clearer. "application.id" and "group.id
>> "
>> > are
>> > >>>>>>> bad enough, in my eyes. I hope it's okay if I just improve the
>> > field
>> > >>>>>>> documentation.
>> > >>>>>>>
>> > >>>>>>> S16. This goes back to KIP-482. Essentially, in flexible request
>> > >>>>>>> schema versions you can define optional fields that will be
>> encoded
>> > >>>>>>> with a field tag on the wire, but can also be completely
>> omitted.
>> > >>>>>>> This is similar to how fields are encoded in, e.g., protobuf. It
>> > >>>>>>> improves schema evolution, because we can add new fields to the
>> > >>>> schema
>> > >>>>>>> without bumping the version. There is also a minor difference in
>> > >>>>>>> encoding size: when encoded, tagged fields take up more bytes on
>> > the
>> > >>>>>>> wire because of the field tag, but they can be omitted
>> completely.
>> > It
>> > >>>>>>> will become interesting when we want to include extra data after
>> > the
>> > >>>>>>> initial protocol definition.
>> > >>>>>>>
>> > >>>>>>> Cheers,
>> > >>>>>>> Lucas
>> > >>>>>>>
>> > >>>>>>> On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman
>> > >>>>>>> <sop...@responsive.dev> wrote:
>> > >>>>>>>>
>> > >>>>>>>> Ah, my bad -- I thought I refreshed the page to get the latest
>> > >>>> version
>> > >>>>>>>> which is why I was a bit confused when I couldn't find anything
>> > >>>> about
>> > >>>>>> the
>> > >>>>>>>> new tools which I had previously seen in the KIP. Sorry for the
>> > >>>>>> confusion
>> > >>>>>>>> and unnecessary questions
>> > >>>>>>>>
>> > >>>>>>>> S1.
>> > >>>>>>>>
>> > >>>>>>>>> You could imagine calling the initialize RPC
>> > >>>>>>>>> explicitly (to implement explicit initialization), but this
>> would
>> > >>>>>>>>> still mean sending an event to the background thread, and the
>> > >>>>>>>>> background thread in turn invokes the RPC. However, explicit
>> > >>>>>>>>> initialization would require some additional public interfaces
>> > >>>> that we
>> > >>>>>>>>> are not including in this KIP.
>> > >>>>>>>>
>> > >>>>>>>> I'm confused -- if we do all the group management stuff in a
>> > >>>> background
>> > >>>>>>>> thread to avoid needing public APIs, how do we pass all the
>> > >>>>>>>> Streams-specific and app-specific info to the broker for the
>> > >>>>>>>> StreamsGroupInitialize? I am guessing this is where we need to
>> > >>>> invoke
>> > >>>>>>>> internal APIs and is related to our discussion about removing
>> the
>> > >>>>>>>> KafkaClientSupplier. However, it seems to me that no matter how
>> > you
>> > >>>>>> look at
>> > >>>>>>>> it, Kafka Streams will need to pass information to and from the
>> > >>>>>> background
>> > >>>>>>>> thread if the background thread is to be aware of things like
>> > tasks,
>> > >>>>>> offset
>> > >>>>>>>> sums, repartition topics, etc
>> > >>>>>>>>
>> > >>>>>>>> We don't really need to rehash everything here since it seems
>> like
>> > >>>> the
>> > >>>>>>>> proposal to add a StreamsConsumer/StreamsAssignment interface
>> will
>> > >>>>>> address
>> > >>>>>>>> this, which I believe we're in agreement on. I just wanted to
>> > point
>> > >>>> out
>> > >>>>>>>> that this work to replace the KafkaClientSupplier is clearly
>> > >>>> intimately
>> > >>>>>>>> tied up with KIP-1071, and should imho be part of this KIP and
>> not
>> > >>>> its
>> > >>>>>> own
>> > >>>>>>>> standalone thing.
>> > >>>>>>>>
>> > >>>>>>>> By the way I'm happy to hop on a call to help move things
>> forward
>> > on
>> > >>>>>> that
>> > >>>>>>>> front (or anything really). Although I guess we're just waiting
>> > on a
>> > >>>>>> design
>> > >>>>>>>> right now?
>> > >>>>>>>>
>> > >>>>>>>> S2. I was suggesting something lower for the default upper
>> limit
>> > on
>> > >>>> all
>> > >>>>>>>> groups. I don't feel too strongly about it, just wanted to
>> point
>> > out
>> > >>>>>> that
>> > >>>>>>>> the tradeoff is not about the assignor runtime but rather about
>> > >>>> resource
>> > >>>>>>>> consumption, and that too many warmups could put undue load on
>> the
>> > >>>>>> cluster.
>> > >>>>>>>> In the end if we want to trust application operators then I'd
>> say
>> > >>>> it's
>> > >>>>>> fine
>> > >>>>>>>> to use a higher cluster-wide max like 100.
>> > >>>>>>>>
>> > >>>>>>>> S8-10 I'll get back to you on this in another followup since
>> I'm
>> > >>>> still
>> > >>>>>>>> thinking some things through and want to keep the discussion
>> > >>>> rolling for
>> > >>>>>>>> now. In the meantime I have some additional questions:
>> > >>>>>>>>
>> > >>>>>>>> S11. One of the main issues with unstable assignments today is
>> the
>> > >>>> fact
>> > >>>>>>>> that assignors rely on deterministic assignment and use the
>> > process
>> > >>>> Id,
>> > >>>>>>>> which is not configurable and only persisted via local disk in
>> a
>> > >>>>>>>> best-effort attempt. It would be a very small change to include
>> > >>>> this in
>> > >>>>>>>> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the
>> PR
>> > for
>> > >>>>>> this
>> > >>>>>>>> myself so as not to add to your load). There's a ticket for it
>> > here:
>> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-15190
>> > >>>>>>>>
>> > >>>>>>>> S12. What exactly is the member epoch and how is it defined?
>> When
>> > >>>> is it
>> > >>>>>>>> bumped? I see in the HB request field that certain values
>> signal a
>> > >>>>>> member
>> > >>>>>>>> joining/leaving, but I take it there are valid positive values
>> > >>>> besides
>> > >>>>>> the
>> > >>>>>>>> 0/-1/-2 codes? More on this in the next question:
>> > >>>>>>>>
>> > >>>>>>>> S13. The KIP says the group epoch is updated in these three
>> cases:
>> > >>>>>>>> a. Every time the topology is updated through the
>> > >>>>>> StreamsGroupInitialize API
>> > >>>>>>>> b. When a member with an assigned warm-up task reports a task
>> > >>>> changelog
>> > >>>>>>>> offset and task changelog end offset whose difference is less
>> that
>> > >>>>>>>> acceptable.recovery.lag.
>> > >>>>>>>> c. When a member updates its topology ID, rack ID, client tags
>> or
>> > >>>>>> process
>> > >>>>>>>> ID. Note: Typically, these do not change within the lifetime
>> of a
>> > >>>>>> Streams
>> > >>>>>>>> client, so this only happens when a member with static
>> membership
>> > >>>>>> rejoins
>> > >>>>>>>> with an updated configuration.
>> > >>>>>>>>
>> > >>>>>>>> S13.i First, just to clarify, these are not the *only* times
>> the
>> > >>>> group
>> > >>>>>>>> epoch is bumped but rather the additional cases on top of the
>> > >>>> regular
>> > >>>>>>>> consumer group protocol -- so it's still bumped when a new
>> member
>> > >>>> joins
>> > >>>>>> or
>> > >>>>>>>> leaves, etc, right?
>> > >>>>>>>> S13.ii Second, does (b) imply the epoch is bumped just whenever
>> > the
>> > >>>>>> broker
>> > >>>>>>>> notices a task has finished warming up, or does it first
>> reassign
>> > >>>> the
>> > >>>>>> task
>> > >>>>>>>> and then bump the group epoch as a result of this task
>> > reassignment?
>> > >>>>>>>> S13.iii Third, (a) mentions the group epoch is bumped on each
>> > >>>>>>>> StreamsGroupInitialize API but (c) says it gets bumped
>> whenever a
>> > >>>> member
>> > >>>>>>>> updates its topology ID. Does this mean that in the event of a
>> > >>>> rolling
>> > >>>>>>>> upgrade that changes the topology, the group epoch is bumped
>> just
>> > >>>> once
>> > >>>>>> or
>> > >>>>>>>> for each member that updates its topology ID? Or does the
>> > "updating
>> > >>>> its
>> > >>>>>>>> topology ID" somehow have something to do with static
>> membership?
>> > >>>>>>>> S13.iv How does the broker know when a member has changed its
>> rack
>> > >>>> ID,
>> > >>>>>>>> client tags, etc -- does it compare these values on every hb??
>> > That
>> > >>>>>> seems
>> > >>>>>>>> like a lot of broker load. If this is all strictly for static
>> > >>>> membership
>> > >>>>>>>> then have we considered leaving static membership support for a
>> > >>>> followup
>> > >>>>>>>> KIP? Not necessarily advocating for that, just wondering if
>> this
>> > was
>> > >>>>>>>> thought about already. Imo it would be acceptable to not
>> support
>> > >>>> static
>> > >>>>>>>> membership in the first version (especially if we fixed other
>> > issues
>> > >>>>>> like
>> > >>>>>>>> making the process Id configurable to get actual stable
>> > >>>> assignments!)
>> > >>>>>>>> S13.v I'm finding the concept of group epoch here to be
>> somewhat
>> > >>>>>>>> overloaded. It sounds like it's trying to keep track of
>> multiple
>> > >>>> things
>> > >>>>>>>> within a single version: the group membership, the topology
>> > >>>> version, the
>> > >>>>>>>> assignment version, and various member statuses (eg rack
>> ID/client
>> > >>>>>> tags).
>> > >>>>>>>> Personally, I think it would be extremely valuable to unroll
>> all
>> > of
>> > >>>> this
>> > >>>>>>>> into separate epochs with clear definitions and boundaries and
>> > >>>> triggers.
>> > >>>>>>>> For example, I would suggest something like this:
>> > >>>>>>>>
>> > >>>>>>>> group epoch: describes the group at a point in time, bumped
>> when
>> > >>>> set of
>> > >>>>>>>> member ids changes (ie static or dynamic member leaves or
>> joins)
>> > --
>> > >>>>>>>> triggered by group coordinator noticing a change in group
>> > membership
>> > >>>>>>>> topology epoch: describes the topology version, bumped for each
>> > >>>>>> successful
>> > >>>>>>>> StreamsGroupInitialize that changes the broker's topology ID --
>> > >>>>>> triggered
>> > >>>>>>>> by group coordinator bumping the topology ID (ie
>> > >>>> StreamsGroupInitialize)
>> > >>>>>>>> (optional) member epoch: describes the member version, bumped
>> > when a
>> > >>>>>>>> memberId changes its rack ID, client tags, or process ID (maybe
>> > >>>> topology
>> > >>>>>>>> ID, not sure) -- is this needed only for static membership?
>> Going
>> > >>>> back
>> > >>>>>> to
>> > >>>>>>>> question S12, when is the member epoch is bumped in the current
>> > >>>>>> proposal?
>> > >>>>>>>> assignment epoch: describes the assignment version, bumped when
>> > the
>> > >>>>>>>> assignor runs successfully (even if the actual assignment of
>> tasks
>> > >>>> does
>> > >>>>>> not
>> > >>>>>>>> change) -- can be triggered by tasks completing warmup, a
>> manual
>> > >>>>>>>> client-side trigger (future KIP only), etc (by definition is
>> > bumped
>> > >>>> any
>> > >>>>>>>> time any of the other epochs are bumped since they all trigger
>> a
>> > >>>>>>>> reassignment)
>> > >>>>>>>>
>> > >>>>>>>> S14. The KIP also mentions a new concept of an  "assignment
>> epoch"
>> > >>>> but
>> > >>>>>>>> doesn't seem to ever elaborate on how this is defined and what
>> > it's
>> > >>>> used
>> > >>>>>>>> for. I assume it's bumped each time the assignment changes and
>> > >>>>>> represents a
>> > >>>>>>>> sort of "assignment version", is that right? Can you describe
>> in
>> > >>>> more
>> > >>>>>>>> detail the difference between the group epoch and the
>> assignment
>> > >>>> epoch?
>> > >>>>>>>>
>> > >>>>>>>> S15. I assume the "InstanceId" field in the HB request
>> corresponds
>> > >>>> to
>> > >>>>>> the
>> > >>>>>>>> group.instance.id config of static membership. I always felt
>> this
>> > >>>> field
>> > >>>>>>>> name was too generic and easy to get mixed up with other
>> > >>>> identifiers,
>> > >>>>>> WDYT
>> > >>>>>>>> about "StaticId" or "StaticInstanceId" to make the static
>> > membership
>> > >>>>>>>> relation more explicit?
>> > >>>>>>>>
>> > >>>>>>>> S16. what is the "flexibleVersions" field in the RPC
>> protocols? I
>> > >>>> assume
>> > >>>>>>>> this is related to versioning compatibility but it's not quite
>> > clear
>> > >>>>>> from
>> > >>>>>>>> the KIP how this is to be used. For example if we modify the
>> > >>>> protocol by
>> > >>>>>>>> adding new fields at the end, we'd bump the version but should
>> the
>> > >>>>>>>> flexibleVersions field change as well?
>> > >>>>>>>>
>> > >>>>>>>> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy
>> > >>>>>>>> <lbruts...@confluent.io.invalid> wrote:
>> > >>>>>>>>
>> > >>>>>>>>> Hi Sophie,
>> > >>>>>>>>>
>> > >>>>>>>>> Thanks for your detailed comments - much appreciated! I think
>> you
>> > >>>> read
>> > >>>>>>>>> a version of the KIP that did not yet include the admin
>> > >>>> command-line
>> > >>>>>>>>> tool and the Admin API extensions, so some of the comments are
>> > >>>> already
>> > >>>>>>>>> addressed in the KIP.
>> > >>>>>>>>>
>> > >>>>>>>>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize are
>> called
>> > in
>> > >>>> the
>> > >>>>>>>>> consumer background thread. Note that in the new consumer
>> > threading
>> > >>>>>>>>> model, all RPCs are run by the background thread. Check out
>> this:
>> > >>>>>>>>>
>> > >>>>>>>>>
>> > >>>>>>
>> > >>>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
>> > >>>>>>>>> for more information. Both our RPCs are just part of the group
>> > >>>>>>>>> membership management and do not need to be invoked
>> explicitly by
>> > >>>> the
>> > >>>>>>>>> application thread. You could imagine calling the initialize
>> RPC
>> > >>>>>>>>> explicitly (to implement explicit initialization), but this
>> would
>> > >>>>>>>>> still mean sending an event to the background thread, and the
>> > >>>>>>>>> background thread in turn invokes the RPC. However, explicit
>> > >>>>>>>>> initialization would require some additional public interfaces
>> > >>>> that we
>> > >>>>>>>>> are not including in this KIP. StreamsGroupDescribe is called
>> by
>> > >>>> the
>> > >>>>>>>>> AdminClient, and used by the command-line tool
>> > >>>>>>>>> kafka-streams-groups.sh.
>> > >>>>>>>>>
>> > >>>>>>>>> S2. I think the max.warmup.replicas=100 suggested by Nick was
>> > >>>> intended
>> > >>>>>>>>> as the upper limit for setting the group configuration on the
>> > >>>> broker.
>> > >>>>>>>>> Just to make sure that this was not a misunderstanding. By
>> > default,
>> > >>>>>>>>> values above 100 should be rejected when setting a specific
>> value
>> > >>>> for
>> > >>>>>>>>> group. Are you suggesting 20 or 30 for the default value for
>> > >>>> groups,
>> > >>>>>>>>> or the default upper limit for the group configuration?
>> > >>>>>>>>>
>> > >>>>>>>>> S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION.
>> The
>> > >>>>>>>>> MemberEpoch=-1 is a left-over from an earlier discussion. It
>> > means
>> > >>>>>>>>> that the member is leaving the group, so the intention was
>> that
>> > the
>> > >>>>>>>>> member must leave the group when it asks the other members to
>> > shut
>> > >>>>>>>>> down. We later reconsidered this and decided that all
>> > applications
>> > >>>>>>>>> should just react to the shutdown application signal that is
>> > >>>> returned
>> > >>>>>>>>> by the broker, so the client first sets the
>> ShutdownApplication
>> > and
>> > >>>>>>>>> later leaves the group. Thanks for spotting this, I removed
>> it.
>> > >>>>>>>>>
>> > >>>>>>>>> S4. Not sure if this refers to the latest version of the KIP.
>> We
>> > >>>> added
>> > >>>>>>>>> an extension of the admin API and a kafka-streams-groups.sh
>> > >>>>>>>>> command-line tool to the KIP already.
>> > >>>>>>>>>
>> > >>>>>>>>> S5. All RPCs for dealing with offsets will keep working with
>> > >>>> streams
>> > >>>>>>>>> groups. The extension of the admin API is rather cosmetic,
>> since
>> > >>>> the
>> > >>>>>>>>> method names use "consumer group". The RPCs, however, are
>> generic
>> > >>>> and
>> > >>>>>>>>> do not need to be changed.
>> > >>>>>>>>>
>> > >>>>>>>>> S6. Yes, you can use the DeleteGroup RPC with any group ID,
>> > whether
>> > >>>>>>>>> streams group or not.
>> > >>>>>>>>>
>> > >>>>>>>>> S7. See the admin API section.
>> > >>>>>>>>>
>> > >>>>>>>>> S8. I guess for both A and B, I am not sure what you are
>> > >>>> suggesting.
>> > >>>>>>>>> Do you want to make the broker-side topology immutable and not
>> > >>>> include
>> > >>>>>>>>> any information about the topology, like the topology ID in
>> the
>> > >>>> RPC?
>> > >>>>>>>>> It would seem that this would be a massive food-gun for
>> people,
>> > if
>> > >>>>>>>>> they start changing their topology and don't notice that the
>> > >>>> broker is
>> > >>>>>>>>> looking at a completely different version of the topology. Or
>> did
>> > >>>> you
>> > >>>>>>>>> mean that there is some kind of topology ID, so that at least
>> we
>> > >>>> can
>> > >>>>>>>>> detect inconsistencies between broker and client-side
>> topologies,
>> > >>>> and
>> > >>>>>>>>> we fence out any member with an incorrect topology ID? Then we
>> > >>>> seem to
>> > >>>>>>>>> end up with mostly the same RPCs and the same questions (how
>> is
>> > the
>> > >>>>>>>>> topology ID generated?). I agree that the latter could be an
>> > >>>> option.
>> > >>>>>>>>> See summary at the end of this message.
>> > >>>>>>>>>
>> > >>>>>>>>> S9. If we flat out refuse topology updates, agree - we cannot
>> let
>> > >>>> the
>> > >>>>>>>>> application crash on minor changes to the topology. However,
>> if
>> > we
>> > >>>>>>>>> allow topology updates as described in the KIP, there are only
>> > >>>> upsides
>> > >>>>>>>>> to making the topology ID more sensitive. All it will cause is
>> > >>>> that a
>> > >>>>>>>>> client will have to resend a `StreamsGroupInitialize`, and
>> during
>> > >>>> the
>> > >>>>>>>>> rolling bounce, older clients will not get tasks assigned.
>> > >>>>>>>>>
>> > >>>>>>>>> S10. The intention in the KIP is really just this - old
>> clients
>> > can
>> > >>>>>>>>> only retain tasks, but not get new ones. If the topology has
>> > >>>>>>>>> sub-topologies, these will initially be assigned to the new
>> > >>>> clients,
>> > >>>>>>>>> which also have the new topology. You are right that rolling
>> an
>> > >>>>>>>>> application where the old structure and the new structure are
>> > >>>>>>>>> incompatible (e.g. different subtopologies access the same
>> > >>>> partition)
>> > >>>>>>>>> will cause problems. But this will also cause problems in the
>> > >>>> current
>> > >>>>>>>>> protocol, so I'm not sure, if it's strictly a regression, it's
>> > just
>> > >>>>>>>>> unsupported (which we can only make the best effort to
>> detect).
>> > >>>>>>>>>
>> > >>>>>>>>> In summary, for the topology ID discussion, I mainly see two
>> > >>>> options:
>> > >>>>>>>>> 1) stick with the current KIP proposal
>> > >>>>>>>>> 2) define the topology ID as the hash of the
>> > StreamsGroupInitialize
>> > >>>>>>>>> topology metadata only, as you suggested, and fence out
>> members
>> > >>>> with
>> > >>>>>>>>> an incompatible topology ID.
>> > >>>>>>>>> I think doing 2 is also a good option, but in the end it comes
>> > >>>> down to
>> > >>>>>>>>> how important we believe topology updates (where the set of
>> > >>>>>>>>> sub-topologies or set of internal topics are affected) to be.
>> The
>> > >>>> main
>> > >>>>>>>>> goal is to make the new protocol a drop-in replacement for the
>> > old
>> > >>>>>>>>> protocol, and at least define the RPCs in a way that we won't
>> > need
>> > >>>>>>>>> another KIP which bumps the RPC version to make the protocol
>> > >>>>>>>>> production-ready. Adding more command-line tools, or public
>> > >>>> interfaces
>> > >>>>>>>>> seems less critical as it doesn't change the protocol and can
>> be
>> > >>>> done
>> > >>>>>>>>> easily later on. The main question becomes - is the protocol
>> > >>>>>>>>> production-ready and sufficient to replace the classic
>> protocol
>> > if
>> > >>>> we
>> > >>>>>>>>> go with 2?
>> > >>>>>>>>>
>> > >>>>>>>>> Cheers,
>> > >>>>>>>>> Lucas
>> > >>>>>>>>>
>> > >>>>>>
>> > >>>>
>> > >>>>
>> > >>>
>> > >>
>> >
>> >
>>
>

Reply via email to