Actually, scratch that. On reflection I think I prefer Bruno's original idea to specify it in the configuration.
Cheers, Nick On Sat, 16 Nov 2024 at 17:59, Nick Telford <nick.telf...@gmail.com> wrote: > Hey everyone, > > With respect to Bruno's proposal, could instances cache their topology > epoch on disk, and then upgrades/downgrades would simply involve deleting > the cached epoch before starting the instance? > > My thinking is that this might be potentially simpler for users than > modifying configuration. > > Cheers, > Nick > > On Sat, 16 Nov 2024, 00:41 Sophie Blee-Goldman, <sop...@responsive.dev> > wrote: > >> Thanks for the updates! Few minor questions before we wrap this up and >> move >> to a vote: >> >> S1. Can you clarify the outward-facing behavior of a group that enters the >> NOT_READY state due to, say, missing source topics? It sounds like the >> application will continue to run without processing anything, which would >> be a change compared to today where we shut down with an explicit error. >> Am >> I reading this correctly and is this change intentional? >> >> S2. Regarding topology upgrades, I am fully in favor of Bruno's proposal. >> Just feel like we need to flesh it out a bit. For example you say "no >> specific tooling" is needed, but how would a user go about upgrading the >> topology? If we can just describe this process in a bit more detail then I >> would feel good. >> >> S3. Lastly, just a quick reminder, we should make sure to clarify in the >> KIP how the topology ID will be computed since that impacts which kind of >> upgrades are possible and is therefore part of the public API. IIUC the >> plan is to compute it from the topics which makes sense to me (though I do >> think it should eventually be pluggable, perhaps in a followup KIP) >> >> On Thu, Nov 7, 2024 at 10:08 AM Bruno Cadonna <cado...@apache.org> wrote: >> >> > Hi Lucas and all others, >> > >> > Thanks for the proposals regarding the topology upgrades! >> > >> > I have an additional proposal: >> > >> > We introduce a topology epoch alongside the topology ID. The topology >> > epoch is set to 0 on the group coordinator for a new group. The topology >> > epoch is also a config within the Streams client, the default is 1. >> > >> > When the Streams client starts up, it sends its topology epoch alongside >> > the topology ID and the topology to the group coordinator. The group >> > coordinator initializes the topology with the given topology ID and >> > increases the topology epoch to 1. >> > >> > When the client wants to upgrade a topology it needs to send a valid >> > topology epoch alongside the new topology ID and the topology. We can >> > decide if the new topology epoch = current topology at group coordinator >> > + 1 or just new topology epoch > current topology on group coordinator. >> > >> > When the group coordinator receives a topology ID that differs from the >> > one intialized for the group, the group coordinator only upgrades to the >> > new topology if the topology epoch is valid. Then, it sets the topology >> > epoch to the new value. If the topology epoch is not valid, that means >> > the topology ID is an old one and the group coordinator does not >> > re-initialize the topology. >> > >> > The current topology epoch can be retrieved with the describe RPC. >> > >> > Accidental rollbacks of topologies are not be possible. Only wanted >> > rollbacks of topologies with an appropriate topology epoch are allowed. >> > >> > Specific tooling is not needed. >> > >> > The topology upgrade can be automated in CI in the following way: >> > >> > 1. Call streams group describe to determine the current epoch. >> > >> > 2. Set the Streams client config topology.epoch = current epoch + 1 >> > >> > 3. Roll the application. >> > >> > WDYT? >> > >> > Best, >> > Bruno >> > >> > >> > On 07.11.24 18:26, Lucas Brutschy wrote: >> > > Hi all, >> > > >> > > I have updated the KIP with some details around how we handle the >> > > cases when essential topics required by the topology are not present. >> > > This is described in a new section "Handling topic topology >> > > mismatches". The short summary is that we enter a state where the >> > > group member's heartbeats will succeed, but no assignments will be >> > > made, and the nature of the topic misconfiguration is communicated to >> > > the members using the `status` field of the heartbeat response. >> > > >> > > Working towards putting up this KIP up for a vote, there was one last >> > > concern mentioned during our last sync, which was a concern raised by >> > > Sophie and others around topology updating. I would like to propose >> > > the following changes to the KIP to solidify the story around topology >> > > updating. >> > > >> > > To recap, a member sends its topology metadata every time it attempts >> > > to join a streams group. When the group already has a topology set, >> > > the KIP proposes to update the topology for the group immediately with >> > > that first heartbeat of a member. All other members will then receive >> > > a non-fatal group status INCONSISTENT_TOPOLOGY, letting them know that >> > > their topology is not equal to the groups topology anymore. Those >> > > members with an outdated topology will be able to retain their current >> > > tasks, but will not receive new tasks, until they have also updated >> > > their topology. This enables a rolling bounce to upgrade the topology. >> > > >> > > There is a concern with this behavior, raised both in our discussions >> > > but also by Guozhang and Sophie. The problem is that a “misbehaving” >> > > member can always roll back a topology upgrade, if it is still running >> > > with the old version of the topology. For example, if we roll 20 >> > > members to a new topology version, and we have successfully restarted >> > > 19 out of 20, but the last member is restarted with the old topology, >> > > this will cause the topology version on the broker to be rolled back. >> > > This could happen, for example, because topology update fails on the >> > > last member, or the last member runs into a fatal error of some kind >> > > (out-of-memory or being fenced, for example). Once the failed member >> > > rejoins, it updates the topology on the broker to the “old” topology, >> > > so performs an involuntary roll-back. Now, we would be in a situation >> > > where 19 out of 20 members are blocked from getting new tasks >> > > assigned. We would be in this state just temporary, until the final, >> > > misbehaving member is restarted correctly with the new topology. >> > > >> > > The original suggestion in the KIP discussion to solve these issues >> > > (for now) was to exclude topology updates from the KIP. However, it >> > > was confirmed by several people (such as Anna) that topology updates >> > > are quite common in the wild, so for having a production-ready >> > > implementation of the new protocol, a mechanism for topology updates >> > > is required. It is not clear how strict the requirement for rolling >> > > updates in the first version of the protocol. No ‘better’ alternative >> > > for updating topologies was suggested. >> > > >> > > Proposal 1) Making topology updating optional: We modify the heartbeat >> > > request to allow joining a group without sending a topology (only a >> > > topology ID). When joining a group this way, the member is explicitly >> > > requesting to not update the topology of the group. In other words, >> > > leaving the topology as null, will mean that no topology update will >> > > be performed on the broker side. If the topology ID does not match the >> > > topology ID of the group, and no topology update is performed, and the >> > > member that joined will just behave like any other member with an >> > > inconsistent topology ID. That is, it will not get any tasks assigned. >> > > Streams clients will never send the topology when they know they are >> > > just rejoining the group, e.g. because the client was fenced and >> > > dropped out of the group. That means, a topology update is only >> > > initiated when a streams application is first started up. This will >> > > prevent "accidental" topology downgrades if we know that we are >> > > rejoining the group. However, if we rejoin the group without knowing >> > > that we are just rejoining (e.g. since the whole pod was restarted), >> > > we could still run into temporary topology downgrades. >> > > >> > > Proposal 2) Disable topology updates by default on the client: >> > > Second, we can disable topology updates by default from the client >> > > side, by extending the streams configuration with a config >> > > enable_topology_updates which defaults to false and extending the >> > > heartbeat RPC by a flag UpdateTopology which propagates that config >> > > value to the broker. The advantage of having a client-side config is, >> > > that, during a roll, I can set the configuration only for updated >> > > clients, which can ensure that no accidental rollbacks happen. The >> > > downside is that after the roll, I would have to do a second roll to >> > > disable enable_topology_updates again, so that we are safe during the >> > > next roll again. However, avoiding the second roll is more of a >> > > usability optimization, which is conceptually more complex and could >> > > address in a follow-up KIP, as in proposal 3. >> > > >> > > Proposal 3) Fixing the topology ID during upgrades (follow-up KIP): To >> > > fully prevent accidental downgrades, one could introduce a broker-side >> > > group-level config that restricts the possible topology IDs that a >> > > client can upgrade to. This would be quite similar in safety as making >> > > topology updates manual. For updating a topology, the user would have >> > > to first, extract the topology ID from the update topology - this >> > > could e.g. be logged when the application is started, second, set the >> > > group-level config using an admin client (something like >> > > group.streams.allowed_upgrade_topology_id), third, roll the >> > > application. This would require quite some extra tooling (how to get >> > > the topology ID), so I’d propose introducing this in a follow-up KIP. >> > > >> > > Let me know if there are any concerns with this approach. >> > > >> > > Cheers, >> > > Lucas >> > > >> > > On Mon, Oct 7, 2024 at 9:42 AM Bruno Cadonna <cado...@apache.org> >> wrote: >> > >> >> > >> Hi all, >> > >> >> > >> we did some major changes to this KIP that we would like the >> community >> > >> to review. The changes are the following: >> > >> >> > >> 1. We merged the Streams group initialize RPC into the Streams group >> > >> heartbeat RPC. We decided to merge them because it simplifies the >> > >> sychronization between initialization and heartbeat during group >> > >> creation and migration from a classic group. A consequence of the >> merge >> > >> is that the heartbeat now needs permissions to create and describe >> > >> topics which before only the initialize call had. But we think that >> is >> > >> not a big deal. A consumer of a Streams client will send the >> metadata of >> > >> its topology in its first heartbeat when it joins the group. After >> that >> > >> it will not send the topology metadata anymore. >> > >> >> > >> 2. We discovered that to create the internal topics, the topology >> > >> metadata also need to include the co-partition groups since we cannot >> > >> infer the co-partitioning solely from the sub-topologies and the >> input >> > >> topics of the sub-topologies. >> > >> >> > >> 3. We added a section that describes how the migration from a classic >> > >> group to a streams group and back works. This is very similar to how >> the >> > >> migration in KIP-848 works. >> > >> >> > >> >> > >> Please give us feedback about the changes and the KIP in general. >> > >> >> > >> We would like to start a vote on this KIP as soon as possible. >> > >> >> > >> Best, >> > >> Bruno >> > >> >> > >> On 11.09.24 01:06, Sophie Blee-Goldman wrote: >> > >>> Just following up to say thank you Lucas for the detailed >> > explanations, and >> > >>> especially the thorough response to my more "existential" questions >> > about >> > >>> building off 848 vs introducing a separate Streams group protocol. >> This >> > >>> really helped me understand the motivations behind this decision. No >> > notes! >> > >>> >> > >>> On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy >> > >>> <lbruts...@confluent.io.invalid> wrote: >> > >>> >> > >>>> Hi Sophie, >> > >>>> >> > >>>> thanks for getting deeply involved in this discussion. >> > >>>> >> > >>>> S16. Adding tagged fields would not require an RPC version bump at >> > >>>> all. This should already a cover a lot of use cases where >> requestion >> > >>>> versioning wouldn't become an issue, as long as it is safe to >> ignore >> > >>>> the new fields. Other than that, I'm not aware of a formal >> definition >> > >>>> of the what request version bumps mean in relation to Apache Kafka >> > >>>> versions - if anybody knows, please let me know. In my >> understanding, >> > >>>> the Kafka protocol evolves independently of AK versions, and for >> the >> > >>>> Kafka protocol to change, you require an accepted KIP, and not an >> AK >> > >>>> release. The grey area is that a protocol change without an AK >> release >> > >>>> does not mean much, because even if cloud vendors deploy it before >> an >> > >>>> AK release, virtually no clients would support it and the protocol >> > >>>> would fall back to the earlier RPC version. So maybe this isn't >> that >> > >>>> clearly formalized because it does not matter much in practice. >> > >>>> Normally, I would expect an RPC version bump after each KIP that >> > >>>> breaks RPC compatibility. But there seems to be some flexibility >> here >> > >>>> - for example, the KIP-848 RPCs were changed after the fact in >> trunk, >> > >>>> because they were not exposed yet by default in AK (or any other >> > >>>> implementation of the Kafka protocol), so a breaking change without >> > >>>> request version bump was probably not deemed problematic. >> > >>>> >> > >>>> S17/18. We can add an informational table for the interface to the >> > >>>> assignor, and that should also make clear how a client-side >> assignment >> > >>>> interface would look. Generally, our idea would be to send all the >> > >>>> input information for the task assignor to a client and expect all >> the >> > >>>> output information of a task assignor in return, very similar to >> > >>>> KIP-848. We did not include such a table because we do not define a >> > >>>> public interface to define custom broker-side assignors in this >> KIP. >> > >>>> But generally, the input / output of the (task) assignor mostly >> > >>>> follows the data fields sent to / from the brokers. Let me do that, >> > >>>> but first I'll respond to the rest of your questions that seem more >> > >>>> important. >> > >>>> >> > >>>> S19. I think an explicit trigger for rebalance could make sense, >> but >> > >>>> let me explain the thinking why we haven't included this in the >> > >>>> current KIP: We don't want to include it for client-side assignment >> > >>>> obviously, because this KIP does not include client-side >> assignment. >> > >>>> For triggering a rebalance when a warm-up has caught up, this is >> > >>>> treated differently in the KIP: the task assignor is called when a >> > >>>> client sends updated task offsets. The client does not update its >> task >> > >>>> offsets regularly, but according to an interval >> > >>>> `task.offset.interval.ms`, but also immediately when a client >> notices >> > >>>> that a warm-up has caught up. So effectively, it is mostly the >> client >> > >>>> that is constantly checking the lag. I absolutely believe that the >> > >>>> client should be checking for `acceptable.recovery.lag`, and we are >> > >>>> using the presence of the task offset field to trigger >> reassignment. >> > >>>> What we weren't sure is how often we need the task offsets to be >> > >>>> updated and how often we want to retrigger the assignment because >> of >> > >>>> that. An explicit field for "forcing" a reassignment may still make >> > >>>> sense, so that we can update the task offsets on the broker more >> often >> > >>>> than we call the assignor. However, I'm not convinced that this >> should >> > >>>> just be a generic "rebalance trigger", as this seems to be going >> > >>>> against the goals of KIP-848. Generally, I think one of the >> principles >> > >>>> of KIP-848 is that a member just describes its current state and >> > >>>> metadata, and the group coordinator makes a decision on whether >> > >>>> reassignment needs to be triggered. So it would seem more natural >> to >> > >>>> just indicate a boolean "I have caught-up warm-up tasks" to the >> group >> > >>>> coordinator. I understand that for client-side assignment, you may >> > >>>> want to trigger assignment explicitely without the group >> coordinator >> > >>>> understanding why, but then we should add such a field as part of >> the >> > >>>> client-side assignment KIP. >> > >>>> >> > >>>> S20. Yes, inside the group coordinator, the main differences are >> > >>>> topology initialization handling, and the semantics of task >> assignment >> > >>>> vs. partition assignment. It will not be a completely separate >> group >> > >>>> coordinator, and we will not change things like the offset reading >> and >> > >>>> writing. What will have its own code path is the handling of >> > >>>> heartbeats and the reconciliation of the group (so, which tasks are >> > >>>> sent to which member at which point in time until we reach the >> > >>>> declarative target assignment that was produced by a client-side or >> > >>>> server-side assignor). Also, the assignor implementations will >> > >>>> obviously be completely separate from the consumer group assignors, >> > >>>> but that is already the case currently. Streams groups will >> co-exist >> > >>>> in the group coordinator with share groups (for queues), classic >> > >>>> groups (old generic protocol for consumers and connect) and >> consumer >> > >>>> groups (from KIP-848), and potentially a new group type for >> connect as >> > >>>> well. Each of these group types have a separate code path for heart >> > >>>> beating, reconciling assignments, keeping track of the internal >> state >> > >>>> of the group and responding to specific requests from admin API, >> like >> > >>>> ConsumerGroupDescribe. For streams, there will be a certain amount >> of >> > >>>> duplication from consumer groups - for example, liveness tracking >> for >> > >>>> members (setting timers for the session timeout etc.) is >> essentially >> > >>>> the same between both groups, and the reconciliation logic is quite >> > >>>> similar. We will make an effort to refactor as much of this code >> in a >> > >>>> way that it can be reused by both group types, and a lot of things >> > >>>> have already been made generic with the introduction of share >> groups >> > >>>> in the group coordinator. >> > >>>> >> > >>>> S21. It's nice that we are discussing "why are we doing this" in >> > >>>> question 21 :D. I think we are touching upon the reasoning in the >> KIP, >> > >>>> but let me explain the reasoning in more detail. You are right, the >> > >>>> reason we want to implement custom RPCs for Kafka Streams is not >> about >> > >>>> broker-side vs. client-side assignment. The broker could implement >> the >> > >>>> Member Metadata versioning scheme of Kafka Streams, intercept >> consumer >> > >>>> group heartbeats that look like Kafka Streams heartbeats, >> deserialize >> > >>>> the custom metadata and run a broker-side assignor for Kafka >> Streams, >> > >>>> and finally serialize custom Kafka Streams assignment metadata, all >> > >>>> based on KIP-848 RPCs. >> > >>>> >> > >>>> First of all, the main difference we see with custom RPCs is really >> > >>>> that streams groups become a first-level concept for both the >> broker >> > >>>> and clients of the Kafka. This also goes into the answer for S20 >> (what >> > >>>> is different beside topology initialization and task >> reconciliation). >> > >>>> With streams groups, I can use the Admin API call to see the >> current >> > >>>> assignment, the target assignment, all the metadata necessary for >> > >>>> assignment, and some information about the current topology in a >> > >>>> human-readable and machine-readable form, instead of having to >> parse >> > >>>> it from logs on some client machine that may or may not have the >> right >> > >>>> log-level configured. This will help humans to understand what is >> > >>>> going on, but can also power automated tools / CLIs / UIs, and I >> think >> > >>>> this is a major improvement for operating a streams application. >> > >>>> Again, this is not about who is in control of the broker or the >> > >>>> clients, the information becomes accessible for everybody who can >> call >> > >>>> the Admin API. It also does not depend on the cloud vendor, it >> will be >> > >>>> an improvement available for pure AK users, and will also work with >> > >>>> client-side assignment (although obviously, if there is an >> intention >> > >>>> to start encoding proprietary data in black-box byte-arrays, this >> > >>>> information remains hidden to users of the open source distribution >> > >>>> again). >> > >>>> >> > >>>> I also think that the original proposal of reusing the new consumer >> > >>>> group protocol for Kafka Streams has some downsides, that, while >> not >> > >>>> being deal-breakers, would make the new behaviour of Kafka Streams >> > >>>> less than ideal. One example is the reconciliation: if we have a >> > >>>> custom stream partition assignor (client- or broker-side doesn't >> > >>>> matter), we can only control the target assignment that the group >> > >>>> converges to, but none of the intermediate states that the group >> > >>>> transitions through while attempting to reach the desired end >> state. >> > >>>> For example, there is no way to prevent the reconciliation loop of >> > >>>> KIP-848 from giving an active task to processX before another >> instance >> > >>>> on processX has revoked the corresponding standby task. There is no >> > >>>> synchronization point where these movements are done at the same >> time >> > >>>> as in the classic protocol, and on the broker we can't prevent >> this, >> > >>>> because the reconciliation loop in KIP-848 does not know what a >> > >>>> standby task is, or what a process ID is. You could argue, you can >> let >> > >>>> the application run into LockExceptions in the meantime, but it >> will >> > >>>> be one of the things that will make Kafka Streams work less than >> > >>>> ideal. Another example is task offsets. It is beneficial for >> streams >> > >>>> partition assignors to have relatively recent task offset >> information, >> > >>>> which, in KIP-848, needs to be sent to the assignor using a >> black-box >> > >>>> MemberMetadata byte array. This member metadata byte array is >> > >>>> persisted to the consumer offset topic every time it changes (for >> > >>>> failover). So we end up with a bad trade-off: if we send the task >> > >>>> offset information to the broker very frequently, we will have to >> > >>>> write those member metadata records very frequently as well. If we >> do >> > >>>> not send them frequently, streams task assignors will have very >> > >>>> outdated lag information. In KIP-1071 we can make the decision to >> not >> > >>>> persist the task offset information, which avoids this trade-off. >> It >> > >>>> gives us up-to-date lag information in the usual case, but after >> > >>>> fail-over of the group coordinator, our lag information may be >> missing >> > >>>> for a short period of time. These are two examples, but I think the >> > >>>> general theme is, that with custom RPCs, Kafka Streams is in >> control >> > >>>> of how the group metadata is processed and how the tasks are >> > >>>> reconciled. This is not just a consideration for now, but also the >> > >>>> future: will KIP-848 reconciliation and metadata management always >> > >>>> work "good enough" for Kafka Streams, or will there be use cases >> where >> > >>>> we want to evolve and customize the behaviour of Kafka Streams >> groups >> > >>>> independently of consumer groups. >> > >>>> >> > >>>> A final point that I would like to make, I don't think that custom >> > >>>> streams RPCs are inherently a lot harder to do for the Kafka >> Streams >> > >>>> community than "just utilizing" KIP-848, at least not as much as >> you >> > >>>> would think. Many things in KIP-848, such customizable assignment >> > >>>> metadata with a metadata versioning scheme are made to look like >> > >>>> general-purpose mechanisms, but are effectively mechanisms for >> Kafka >> > >>>> Streams and also completely unimplemented at this point in any >> > >>>> implementation of the Kafka protocol. We know that there is very >> > >>>> little use of custom partition assignors in Kafka besides Kafka >> > >>>> Streams, and we can expect the same to be true of these new >> > >>>> "customizable" features of KIP-848. So, besides the Kafka Streams >> > >>>> community, there is no one driving the implementation of these >> > >>>> features. "Just utilizing" KIP-848 for Kafka Streams would >> > >>>> effectively mean to implement all this, and still make a lot of >> deep >> > >>>> changes to Kafka Streams (as mechanisms like probing rebalances >> etc. >> > >>>> don't make sense in KIP-848). I think adapting KIP-848 to work with >> > >>>> Kafka Streams either way is a big effort. Piggybacking on consumer >> > >>>> group RPCs and extending the consumer group protocol to the point >> that >> > >>>> it can carry Kafka Streams on its back would not make things >> > >>>> significantly easier. Some things, like online migration from the >> old >> > >>>> protocol to the new protocol or reimplementing metadata versioning >> on >> > >>>> top of KIP-848, may even be harder in KIP-848, than in KIP-1071. I >> > >>>> don't think we can consider consumer group RPCs as a shortcut to >> > >>>> revamp the streams rebalance protocol, but should be focussing on >> what >> > >>>> makes Kafka Streams work best down the road. >> > >>>> >> > >>>> Hope that makes sense! >> > >>>> >> > >>>> Cheers, >> > >>>> Lucas >> > >>>> >> > >>>> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman >> > >>>> <sop...@responsive.dev> wrote: >> > >>>>> >> > >>>>> Thanks for another detailed response Lucas! Especially w.r.t how >> the >> > >>>> epochs >> > >>>>> are defined. I also went back and re-read KIP-848 to refresh >> myself, >> > I >> > >>>>> guess it wasn't clear to me how much of the next-gen consumer >> > protocol we >> > >>>>> are reusing vs what's being built from the ground up. >> > >>>>> >> > >>>>> S16. >> > >>>>> >> > >>>>>> It will become interesting when we want to include extra data >> > after >> > >>>> the >> > >>>>>> initial protocol definition. >> > >>>>> >> > >>>>> I guess that's what I'm asking us to define -- how do we evolve >> the >> > >>>> schema >> > >>>>> when expanding the protocol? I think we need to set a few public >> > >>>> contracts >> > >>>>> here, such as whether/which versions get bumped when. For example >> if >> > I >> > >>>>> add/modify fields twice within a single Kafka release, do we still >> > need >> > >>>> to >> > >>>>> update the version? I think the answer is "yes" since I know you >> > >>>> Confluent >> > >>>>> folks upgrade brokers more often than AK releases, but this is >> > different >> > >>>>> from how eg the SubscriptionInfo/AssignmentInfo of the >> > >>>>> StreamsPartitionAssignor works today, hence I want to put all >> that in >> > >>>>> writing (for those of us who aren't familiar with how all this >> works >> > >>>>> already and may be modifying it in the future, say, to add >> > client-side >> > >>>>> assignment :P) >> > >>>>> >> > >>>>> Maybe a quick example of what to do to evolve this schema is >> > sufficient? >> > >>>>> >> > >>>>> S17. >> > >>>>> The KIP seems to throw the assignor, group coordinator, topic >> > >>>>> initialization, and topology versioning all into a single black >> box >> > on >> > >>>> the >> > >>>>> broker. I understand much of this is intentionally being glossed >> > over as >> > >>>> an >> > >>>>> implementation detail since it's a KIP, but it would really help >> to >> > tease >> > >>>>> apart these distinct players and define their interactions. For >> > example, >> > >>>>> what are the inputs and outputs to the assignor? For example >> could we >> > >>>> get a >> > >>>>> table like the "Data Model >> > >>>>> < >> > >>>> >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel >> > >>>>> " >> > >>>>> in 848? >> > >>>>> >> > >>>>> KIP-848 goes into detail on these things, but again, it's unclear >> > what >> > >>>>> parts of 848 are and aren't supposed to be carried through. For >> > example >> > >>>> 848 >> > >>>>> has a concept of assignor "reasons" and client-side assignors and >> > >>>> assignor >> > >>>>> metadata which can all trigger a rebalance/group epoch bump. >> However >> > in >> > >>>> the >> > >>>>> current plan for 1071 none of these exist. Which leads me to my >> next >> > >>>>> point... >> > >>>>> >> > >>>>> S18. >> > >>>>> Many people use a managed Kafka service like Confluent Cloud where >> > >>>> brokers >> > >>>>> are on the bleeding edge but clients are upgraded less frequently. >> > >>>> However, >> > >>>>> the opposite is also true for a great many users: client >> > applications are >> > >>>>> easy to upgrade and bumped often to get new features, whereas the >> > brokers >> > >>>>> (often run by a different team altogether) are considered riskier >> to >> > >>>>> upgrade and stay on one version for a long time. We can argue >> about >> > which >> > >>>>> case is more common -- in my personal experience it's actually the >> > >>>> latter, >> > >>>>> although I only realized this after my time at Confluent which was >> > >>>>> obviously skewed towards meeting Confluent Cloud users -- but >> > regardless, >> > >>>>> we obviously need to support both cases as first-class citizens. >> And >> > I'm >> > >>>> a >> > >>>>> bit concerned that there's not enough in the KIP to ensure that >> > >>>> client-side >> > >>>>> assignment won't end up getting hacked into the protocol as an >> > >>>> afterthought. >> > >>>>> >> > >>>>> Don't get me wrong, I'm still not demanding you guys implement >> > >>>> client-side >> > >>>>> assignors as part of this KIP. I'm happy to leave the >> implementation >> > of >> > >>>>> that to a followup, but I feel we need to flesh out the basic >> > direction >> > >>>> for >> > >>>>> this right now, as part of the v0 Streams protocol. >> > >>>>> >> > >>>>> I'm also not asking you to do this alone, and am happy to work >> > with/for >> > >>>> you >> > >>>>> on this. I have one possible proposal sketched out already, >> although >> > >>>> before >> > >>>>> I share that I just wanted to make sure we're aligned on the >> > fundamentals >> > >>>>> and have mapped out the protocol in full first (ie S17). Just >> want to >> > >>>> have >> > >>>>> a rough plan for how clients-side assignment will fit into the >> > picture, >> > >>>> and >> > >>>>> right now it's difficult to define because it's unclear where the >> > >>>>> boundaries between the group coordinator and broker-side assignor >> > are (eg >> > >>>>> what are assignor inputs and outputs). >> > >>>>> >> > >>>>> I'm committed to ensuring this doesn't add extra work or delay to >> > this >> > >>>> KIP >> > >>>>> for you guys. >> > >>>>> >> > >>>>> S19. >> > >>>>> I think it could make sense to include a concept like the >> "assignor >> > >>>>> reason" from 848 as a top-level field and rebalance trigger (ie >> > group >> > >>>>> epoch bump). This would mainly be for the client-side assignor to >> > >>>> trigger a >> > >>>>> rebalance, but for another example, it seems like we want the >> > brokers to >> > >>>> be >> > >>>>> constantly monitoring warmup tasks and processing lag updates to >> know >> > >>>> when >> > >>>>> a warmup task is to be switched, which is heavy work. Why not just >> > let >> > >>>> the >> > >>>>> client who owns the warmup and knows the lag decide when it has >> > warmed up >> > >>>>> and signal the broker? It's a small thing for a client to check >> when >> > its >> > >>>>> task gets within the acceptable recovery lag and notify via the >> > heartbeat >> > >>>>> if so, but it seems like a rather big deal for the broker to be >> > >>>> constantly >> > >>>>> computing these checks for every task on every hb. Just food for >> > thought, >> > >>>>> I don't really have a strong preference over where the warmup >> > decision >> > >>>>> happens -- simply pointing it out that it fits very neatly into >> the >> > >>>>> "assignor reason"/"rebalance requested" framework. Making this a >> > >>>>> client-side decision also makes it easier to evolve and customize >> > since >> > >>>>> Kafka Streams users tend to have more control over >> > upgrading/implementing >> > >>>>> stuff in their client application vs the brokers (which is often a >> > >>>>> different team altogether) >> > >>>>> >> > >>>>> S20. >> > >>>>> To what extent do we plan to reuse implementations of 848 in >> order to >> > >>>> build >> > >>>>> the new Streams group protocol? I'm still having some trouble >> > >>>> understanding >> > >>>>> where we are just adding stuff on top and where we plan to rebuild >> > from >> > >>>> the >> > >>>>> ground up. For example we seem to share most of the epoch >> > reconciliation >> > >>>>> stuff but will have different triggers for the group epoch bump. >> Is >> > it >> > >>>>> possible to just "plug in" the group epoch and let the consumer's >> > group >> > >>>>> coordinator figure stuff out from there? Will we have a completely >> > >>>> distinct >> > >>>>> Streams coordinator? I know this is an implementation detail but >> imo >> > it's >> > >>>>> ok to get in the weeds a bit for a large and complex KIP such as >> > this. >> > >>>>> >> > >>>>> Setting aside the topology initialization stuff, is the main >> > difference >> > >>>>> really just in how the group coordinator will be aware of tasks >> for >> > >>>> Streams >> > >>>>> groups vs partitions for consumer groups? >> > >>>>> >> > >>>>> Which brings me to a high level question I feel I have to ask: >> > >>>>> >> > >>>>> S21. >> > >>>>> Can you go into more detail on why we have to provide Kafka >> Streams >> > its >> > >>>> own >> > >>>>> RPCs as opposed to just utilizing the protocol from KIP-848? The >> > Rejected >> > >>>>> Alternatives does address this but the main argument there seems >> to >> > be in >> > >>>>> implementing a broker-side assignor. But can't you do this with >> 848? >> > And >> > >>>>> given there are users who rarely upgrade their brokers just as >> there >> > are >> > >>>>> users who rarely upgrade their clients, it seems silly to do all >> this >> > >>>> work >> > >>>>> and change the fundamental direction of Kafka Streams just to give >> > more >> > >>>>> control over the assignment to the brokers. >> > >>>>> >> > >>>>> For a specific example, the KIP says "Client-side assignment >> makes it >> > >>>> hard >> > >>>>> to debug and tune the assignment logic" -- personally, I would >> find >> > it >> > >>>>> infinitely more difficult to tune and debug assignment logic if it >> > were >> > >>>>> happening on the broker. But that's just because I work with the >> app >> > >>>> devs, >> > >>>>> not the cluster operators. The point is we shouldn't design an >> entire >> > >>>>> protocol to give preference to one vendor or another -- nobody >> wants >> > the >> > >>>>> ASF to start yelling at us lol D: >> > >>>>> >> > >>>>> Another example, which imo is a good one (IIUC) is that "a simple >> > >>>> parameter >> > >>>>> change requires redeploying of all streams clients". That's >> > definitely a >> > >>>>> huge bummer, but again, why would broker-side assignors not be >> able >> > to >> > >>>>> trigger a reassignment based on a custom config in the 848 >> protocol? >> > We >> > >>>>> could still have the ha and sticky assignors implemented on the >> > brokers, >> > >>>>> and could still define all the Streams configs as broker configs >> to >> > be >> > >>>>> passed into the streams custom broker assignors. That feels like a >> > lot >> > >>>> less >> > >>>>> work than redoing our own group protocol from scratch? >> > >>>>> >> > >>>>> I don't want you guys to think I'm trying to block the entire >> > direction >> > >>>> of >> > >>>>> this KIP. I'd just like to better understand your thoughts on this >> > since >> > >>>> I >> > >>>>> assume you've discussed this at length internally. Just help me >> > >>>> understand >> > >>>>> why this huge proposal is justified (for my own sake and to >> convince >> > any >> > >>>>> "others" who might be skeptical) >> > >>>>> >> > >>>>> >> > >>>>> On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org> >> > wrote: >> > >>>>> >> > >>>>>> I still need to catch up on the discussion in general. But as >> > promised, >> > >>>>>> I just started KIP-1088 about the `KafkaClientSupplier` question. >> > >>>>>> >> > >>>>>> Looking forward to your feedback on the new KIP. I hope we can >> get >> > >>>>>> KIP-1088 done soon, to not block this KIP. >> > >>>>>> >> > >>>>>> >> > >>>>>> -Matthias >> > >>>>>> >> > >>>>>> On 9/4/24 09:52, Lucas Brutschy wrote: >> > >>>>>>> Hi Sophie, >> > >>>>>>> >> > >>>>>>> thanks for the questions and comments! >> > >>>>>>> >> > >>>>>>> S1. KIP-1071 does not need to add public interfaces to streams, >> but >> > >>>> it >> > >>>>>>> does not (yet) make the streams protocol client pluggable. >> Matthias >> > >>>>>>> promised to write a KIP that will propose a solution for the >> > >>>>>>> KafkaClientSupplier soon. He suggested keeping it separate from >> > this >> > >>>>>>> KIP, but we can treat it as a blocker. You are right that we >> could >> > >>>> try >> > >>>>>>> fitting an explicit topic initialization in this interface, if >> we >> > >>>>>>> decide we absolutely need it now, although I'm not sure if that >> > would >> > >>>>>>> actually define a useful workflow for the users to explicitly >> > update >> > >>>> a >> > >>>>>>> topology. Explicit initialization cannot be part of admin tools, >> > >>>>>>> because we do not have the topology there. It could probably be >> a >> > >>>>>>> config that defaults to off, or a method on the KafkaStreams >> > object? >> > >>>>>>> To me, explicit initialization opens a broad discussion on how >> to >> > do >> > >>>>>>> it, so I would like to avoid including it in this KIP. I guess >> it >> > >>>> also >> > >>>>>>> depends on the outcome of the discussion in S8-S10. >> > >>>>>>> >> > >>>>>>> S2. It's hard to define the perfect default, and we should >> probably >> > >>>>>>> aim for safe operations first - since people can always up the >> > limit. >> > >>>>>>> I changed it to 20. >> > >>>>>>> >> > >>>>>>> S11. I was going to write we can add it, but it's actually quite >> > >>>>>>> independent of KIP-1071 and also a nice newbie ticket, so I >> would >> > >>>>>>> propose we treat it as out-of-scope and let somebody define it >> in a >> > >>>>>>> separate KIP - any discussion about it would anyway be lost in >> this >> > >>>>>>> long discussion thread. >> > >>>>>>> >> > >>>>>>> S12. Member epoch behaves the same as in KIP-848. See the >> KIP-848 >> > >>>>>>> explanation for basics: >> > >>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup >> > >>>>>>> >> > >>>>>>> Together with the section "Reconciliation of the group", this >> > should >> > >>>>>>> make clear how member epochs, group epochs and assignment epochs >> > work >> > >>>>>>> in KIP-1071, but I'll try to summarize the reconciliation a bit >> > more >> > >>>>>>> explicitly below: >> > >>>>>>> >> > >>>>>>> Essentially, the broker drives a reconciliation for each member >> > >>>>>>> separately. The goal is to reconcile the assignment generated by >> > the >> > >>>>>>> assignor and reach the assignment epoch, but it's an >> asynchronous >> > >>>>>>> process, so each member may still be on an older epoch, which is >> > the >> > >>>>>>> member epoch. We do not reinvent any of the internal machinery >> for >> > >>>>>>> streams here, we are just adding standby tasks and warm-up >> tasks to >> > >>>>>>> the reconciliation of KIP-848. >> > >>>>>>> >> > >>>>>>> During reconciliation, the broker first asks the member to >> revoke >> > all >> > >>>>>>> tasks that are not in its target assignment, by removing it from >> > the >> > >>>>>>> current assignment for the member (which is provided in the >> > heartbeat >> > >>>>>>> response). Until revocation is completed, the member remains on >> the >> > >>>>>>> old member epoch and in `UNREVOKED` state. Only once the member >> > >>>>>>> confirms revocation of these tasks by removing them from the >> set of >> > >>>>>>> owned tasks in the heartbeat request, the member can transition >> to >> > >>>> the >> > >>>>>>> current assignment epoch and get new tasks assigned. Once the >> > member >> > >>>>>>> has revoked all tasks that are not in its target assignment, it >> > >>>>>>> transitions to the new epoch and gets new tasks assigned. There >> are >> > >>>>>>> some tasks that may be in the (declarative) target assignment of >> > the >> > >>>>>>> member generated by the assignor, but that may not be >> immediately >> > >>>>>>> added to the current assignment of the member, because they are >> > still >> > >>>>>>> "unreleased". Unreleased tasks are: >> > >>>>>>> >> > >>>>>>> - A. For any task, another instance of the same process may own >> the >> > >>>>>>> task in any role (active/standby/warm-up), and we are still >> > awaiting >> > >>>>>>> revocation. >> > >>>>>>> >> > >>>>>>> - B. For an active task, an instance of any process may own the >> > same >> > >>>>>>> active task, and we are still awaiting revocation. >> > >>>>>>> >> > >>>>>>> As long as there is at least one unreleased task, the member >> > remains >> > >>>>>>> in state UNRELEASED. Once all tasks of the target assignment are >> > >>>> added >> > >>>>>>> to the current assignment of the member (because they were >> > released), >> > >>>>>>> the member transitions to STABLE. >> > >>>>>>> >> > >>>>>>> Technically, the first constraint (we cannot assign a task in >> two >> > >>>>>>> different roles to the same process) is only relevant for >> stateful >> > >>>>>>> tasks that use the local state directory. I wonder if we should >> > relax >> > >>>>>>> this restriction slightly, by marking sub-topologies that access >> > the >> > >>>>>>> local state directory. This information could also be used by >> the >> > >>>>>>> assignor. But we can also introduce as a follow-up improvement. >> > >>>>>>> >> > >>>>>>> S13i. You are right, these bumps of the group epoch are "on >> top" of >> > >>>>>>> the KIP-848 reasons for bumping the group epoch. I added the >> > KIP-848 >> > >>>>>>> reasons as well, because this was a bit confusing in the KIP. >> > >>>>>>> >> > >>>>>>> S13ii. The group epoch is bumped immediately once a warm-up task >> > has >> > >>>>>>> caught up, since this is what triggers running the assignor. >> > However, >> > >>>>>>> the assignment epoch is only bumped once the task assignor has >> run. >> > >>>> If >> > >>>>>>> the task assignment for a member hasn't changed, it will >> transition >> > >>>> to >> > >>>>>>> the new assignment epoch immediately. >> > >>>>>>> >> > >>>>>>> S13iii. In the case of a rolling bounce, the group epoch will be >> > >>>>>>> bumped for every member that leaves and joins the group. For >> > updating >> > >>>>>>> rack ID, client tags, topology ID etc. -- this will mostly play >> a >> > >>>> role >> > >>>>>>> for static membership, as you mentioned, but would cause the >> same >> > >>>>>>> thing - in a rolling bounce with static membership, we will >> attempt >> > >>>> to >> > >>>>>>> recompute the assignment after every bounce, if the topology ID >> is >> > >>>>>>> changed in the members. This can cause broker load, as you >> > mentioned. >> > >>>>>>> Note, however, that we don't have to compute an assignment for >> > every >> > >>>>>>> group epoch bump. While KIP-848 computes its assignment as part >> of >> > >>>> the >> > >>>>>>> main loop of the group coordinator, it uses asynchronous >> assignment >> > >>>> in >> > >>>>>>> the case of client-side assignment, where the assignment is >> slower. >> > >>>> So >> > >>>>>>> it will kick off the assignment on the client side, enter state >> > >>>>>>> `ASSIGNING`, but may have more group epoch bumps in the meantime >> > >>>> while >> > >>>>>>> the assignment is computed on the client side. We leave it open >> to >> > >>>> use >> > >>>>>>> the same mechanism on the broker to compute assignment >> > asynchronously >> > >>>>>>> on a separate thread, in case streams assignment is too slow for >> > the >> > >>>>>>> main loop of the group coordinator. We will also consider >> > >>>>>>> rate-limiting the assignor to reduce broker load. However, it's >> too >> > >>>>>>> early to define this in detail, as we don't know much about the >> > >>>>>>> performance impact yet. >> > >>>>>>> >> > >>>>>>> S13iv. The heartbeat just sends "null" instead of the actual >> value >> > in >> > >>>>>>> case the value didn't change since the last heartbeat, so no >> > >>>>>>> comparison is required. This is defined in the heartbeat RPC and >> > the >> > >>>>>>> same as in KIP-848. >> > >>>>>>> >> > >>>>>>> S13v. I think I disagree. The group epoch represents a version >> of >> > the >> > >>>>>>> group state, including the current topology and the metadata of >> all >> > >>>>>>> members -- the whole input to the task assignor -- and defines >> > >>>> exactly >> > >>>>>>> the condition under which we attempt to recompute the >> assignment. >> > So >> > >>>>>>> it's not overloaded, and has a clear definition. It's also >> exactly >> > >>>> the >> > >>>>>>> same as in KIP-848, and we do not want to move away too far from >> > that >> > >>>>>>> protocol, unless it's something that is specific to streams that >> > need >> > >>>>>>> to be changed. Also, the protocol already uses three types of >> > epochs >> > >>>> - >> > >>>>>>> member epoch, assignment epoch, group epoch. I don't think >> adding >> > >>>> more >> > >>>>>>> epochs will make things clearer. Unless we have a strong reason >> why >> > >>>> we >> > >>>>>>> need to add extra epochs, I'm not in favor. >> > >>>>>>> >> > >>>>>>> S14. Assignment epoch is the same as in KIP-848: >> > >>>>>>> >> > >>>>>>> >> > >>>>>> >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment >> > >>>>>>> >> > >>>>>>> Its definition does not change at all, so we didn't describe it >> in >> > >>>>>>> detail in KIP-1071. >> > >>>>>>> >> > >>>>>>> S15. I agree that the name is a bit generic, but the time to >> change >> > >>>>>>> this was when `group.instance.id` was introduced. Also, if we >> > start >> > >>>>>>> diverging from regular consumers / consumer groups in terms of >> > >>>> naming, >> > >>>>>>> it will not make things clearer. "application.id" and "group.id >> " >> > are >> > >>>>>>> bad enough, in my eyes. I hope it's okay if I just improve the >> > field >> > >>>>>>> documentation. >> > >>>>>>> >> > >>>>>>> S16. This goes back to KIP-482. Essentially, in flexible request >> > >>>>>>> schema versions you can define optional fields that will be >> encoded >> > >>>>>>> with a field tag on the wire, but can also be completely >> omitted. >> > >>>>>>> This is similar to how fields are encoded in, e.g., protobuf. It >> > >>>>>>> improves schema evolution, because we can add new fields to the >> > >>>> schema >> > >>>>>>> without bumping the version. There is also a minor difference in >> > >>>>>>> encoding size: when encoded, tagged fields take up more bytes on >> > the >> > >>>>>>> wire because of the field tag, but they can be omitted >> completely. >> > It >> > >>>>>>> will become interesting when we want to include extra data after >> > the >> > >>>>>>> initial protocol definition. >> > >>>>>>> >> > >>>>>>> Cheers, >> > >>>>>>> Lucas >> > >>>>>>> >> > >>>>>>> On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman >> > >>>>>>> <sop...@responsive.dev> wrote: >> > >>>>>>>> >> > >>>>>>>> Ah, my bad -- I thought I refreshed the page to get the latest >> > >>>> version >> > >>>>>>>> which is why I was a bit confused when I couldn't find anything >> > >>>> about >> > >>>>>> the >> > >>>>>>>> new tools which I had previously seen in the KIP. Sorry for the >> > >>>>>> confusion >> > >>>>>>>> and unnecessary questions >> > >>>>>>>> >> > >>>>>>>> S1. >> > >>>>>>>> >> > >>>>>>>>> You could imagine calling the initialize RPC >> > >>>>>>>>> explicitly (to implement explicit initialization), but this >> would >> > >>>>>>>>> still mean sending an event to the background thread, and the >> > >>>>>>>>> background thread in turn invokes the RPC. However, explicit >> > >>>>>>>>> initialization would require some additional public interfaces >> > >>>> that we >> > >>>>>>>>> are not including in this KIP. >> > >>>>>>>> >> > >>>>>>>> I'm confused -- if we do all the group management stuff in a >> > >>>> background >> > >>>>>>>> thread to avoid needing public APIs, how do we pass all the >> > >>>>>>>> Streams-specific and app-specific info to the broker for the >> > >>>>>>>> StreamsGroupInitialize? I am guessing this is where we need to >> > >>>> invoke >> > >>>>>>>> internal APIs and is related to our discussion about removing >> the >> > >>>>>>>> KafkaClientSupplier. However, it seems to me that no matter how >> > you >> > >>>>>> look at >> > >>>>>>>> it, Kafka Streams will need to pass information to and from the >> > >>>>>> background >> > >>>>>>>> thread if the background thread is to be aware of things like >> > tasks, >> > >>>>>> offset >> > >>>>>>>> sums, repartition topics, etc >> > >>>>>>>> >> > >>>>>>>> We don't really need to rehash everything here since it seems >> like >> > >>>> the >> > >>>>>>>> proposal to add a StreamsConsumer/StreamsAssignment interface >> will >> > >>>>>> address >> > >>>>>>>> this, which I believe we're in agreement on. I just wanted to >> > point >> > >>>> out >> > >>>>>>>> that this work to replace the KafkaClientSupplier is clearly >> > >>>> intimately >> > >>>>>>>> tied up with KIP-1071, and should imho be part of this KIP and >> not >> > >>>> its >> > >>>>>> own >> > >>>>>>>> standalone thing. >> > >>>>>>>> >> > >>>>>>>> By the way I'm happy to hop on a call to help move things >> forward >> > on >> > >>>>>> that >> > >>>>>>>> front (or anything really). Although I guess we're just waiting >> > on a >> > >>>>>> design >> > >>>>>>>> right now? >> > >>>>>>>> >> > >>>>>>>> S2. I was suggesting something lower for the default upper >> limit >> > on >> > >>>> all >> > >>>>>>>> groups. I don't feel too strongly about it, just wanted to >> point >> > out >> > >>>>>> that >> > >>>>>>>> the tradeoff is not about the assignor runtime but rather about >> > >>>> resource >> > >>>>>>>> consumption, and that too many warmups could put undue load on >> the >> > >>>>>> cluster. >> > >>>>>>>> In the end if we want to trust application operators then I'd >> say >> > >>>> it's >> > >>>>>> fine >> > >>>>>>>> to use a higher cluster-wide max like 100. >> > >>>>>>>> >> > >>>>>>>> S8-10 I'll get back to you on this in another followup since >> I'm >> > >>>> still >> > >>>>>>>> thinking some things through and want to keep the discussion >> > >>>> rolling for >> > >>>>>>>> now. In the meantime I have some additional questions: >> > >>>>>>>> >> > >>>>>>>> S11. One of the main issues with unstable assignments today is >> the >> > >>>> fact >> > >>>>>>>> that assignors rely on deterministic assignment and use the >> > process >> > >>>> Id, >> > >>>>>>>> which is not configurable and only persisted via local disk in >> a >> > >>>>>>>> best-effort attempt. It would be a very small change to include >> > >>>> this in >> > >>>>>>>> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the >> PR >> > for >> > >>>>>> this >> > >>>>>>>> myself so as not to add to your load). There's a ticket for it >> > here: >> > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-15190 >> > >>>>>>>> >> > >>>>>>>> S12. What exactly is the member epoch and how is it defined? >> When >> > >>>> is it >> > >>>>>>>> bumped? I see in the HB request field that certain values >> signal a >> > >>>>>> member >> > >>>>>>>> joining/leaving, but I take it there are valid positive values >> > >>>> besides >> > >>>>>> the >> > >>>>>>>> 0/-1/-2 codes? More on this in the next question: >> > >>>>>>>> >> > >>>>>>>> S13. The KIP says the group epoch is updated in these three >> cases: >> > >>>>>>>> a. Every time the topology is updated through the >> > >>>>>> StreamsGroupInitialize API >> > >>>>>>>> b. When a member with an assigned warm-up task reports a task >> > >>>> changelog >> > >>>>>>>> offset and task changelog end offset whose difference is less >> that >> > >>>>>>>> acceptable.recovery.lag. >> > >>>>>>>> c. When a member updates its topology ID, rack ID, client tags >> or >> > >>>>>> process >> > >>>>>>>> ID. Note: Typically, these do not change within the lifetime >> of a >> > >>>>>> Streams >> > >>>>>>>> client, so this only happens when a member with static >> membership >> > >>>>>> rejoins >> > >>>>>>>> with an updated configuration. >> > >>>>>>>> >> > >>>>>>>> S13.i First, just to clarify, these are not the *only* times >> the >> > >>>> group >> > >>>>>>>> epoch is bumped but rather the additional cases on top of the >> > >>>> regular >> > >>>>>>>> consumer group protocol -- so it's still bumped when a new >> member >> > >>>> joins >> > >>>>>> or >> > >>>>>>>> leaves, etc, right? >> > >>>>>>>> S13.ii Second, does (b) imply the epoch is bumped just whenever >> > the >> > >>>>>> broker >> > >>>>>>>> notices a task has finished warming up, or does it first >> reassign >> > >>>> the >> > >>>>>> task >> > >>>>>>>> and then bump the group epoch as a result of this task >> > reassignment? >> > >>>>>>>> S13.iii Third, (a) mentions the group epoch is bumped on each >> > >>>>>>>> StreamsGroupInitialize API but (c) says it gets bumped >> whenever a >> > >>>> member >> > >>>>>>>> updates its topology ID. Does this mean that in the event of a >> > >>>> rolling >> > >>>>>>>> upgrade that changes the topology, the group epoch is bumped >> just >> > >>>> once >> > >>>>>> or >> > >>>>>>>> for each member that updates its topology ID? Or does the >> > "updating >> > >>>> its >> > >>>>>>>> topology ID" somehow have something to do with static >> membership? >> > >>>>>>>> S13.iv How does the broker know when a member has changed its >> rack >> > >>>> ID, >> > >>>>>>>> client tags, etc -- does it compare these values on every hb?? >> > That >> > >>>>>> seems >> > >>>>>>>> like a lot of broker load. If this is all strictly for static >> > >>>> membership >> > >>>>>>>> then have we considered leaving static membership support for a >> > >>>> followup >> > >>>>>>>> KIP? Not necessarily advocating for that, just wondering if >> this >> > was >> > >>>>>>>> thought about already. Imo it would be acceptable to not >> support >> > >>>> static >> > >>>>>>>> membership in the first version (especially if we fixed other >> > issues >> > >>>>>> like >> > >>>>>>>> making the process Id configurable to get actual stable >> > >>>> assignments!) >> > >>>>>>>> S13.v I'm finding the concept of group epoch here to be >> somewhat >> > >>>>>>>> overloaded. It sounds like it's trying to keep track of >> multiple >> > >>>> things >> > >>>>>>>> within a single version: the group membership, the topology >> > >>>> version, the >> > >>>>>>>> assignment version, and various member statuses (eg rack >> ID/client >> > >>>>>> tags). >> > >>>>>>>> Personally, I think it would be extremely valuable to unroll >> all >> > of >> > >>>> this >> > >>>>>>>> into separate epochs with clear definitions and boundaries and >> > >>>> triggers. >> > >>>>>>>> For example, I would suggest something like this: >> > >>>>>>>> >> > >>>>>>>> group epoch: describes the group at a point in time, bumped >> when >> > >>>> set of >> > >>>>>>>> member ids changes (ie static or dynamic member leaves or >> joins) >> > -- >> > >>>>>>>> triggered by group coordinator noticing a change in group >> > membership >> > >>>>>>>> topology epoch: describes the topology version, bumped for each >> > >>>>>> successful >> > >>>>>>>> StreamsGroupInitialize that changes the broker's topology ID -- >> > >>>>>> triggered >> > >>>>>>>> by group coordinator bumping the topology ID (ie >> > >>>> StreamsGroupInitialize) >> > >>>>>>>> (optional) member epoch: describes the member version, bumped >> > when a >> > >>>>>>>> memberId changes its rack ID, client tags, or process ID (maybe >> > >>>> topology >> > >>>>>>>> ID, not sure) -- is this needed only for static membership? >> Going >> > >>>> back >> > >>>>>> to >> > >>>>>>>> question S12, when is the member epoch is bumped in the current >> > >>>>>> proposal? >> > >>>>>>>> assignment epoch: describes the assignment version, bumped when >> > the >> > >>>>>>>> assignor runs successfully (even if the actual assignment of >> tasks >> > >>>> does >> > >>>>>> not >> > >>>>>>>> change) -- can be triggered by tasks completing warmup, a >> manual >> > >>>>>>>> client-side trigger (future KIP only), etc (by definition is >> > bumped >> > >>>> any >> > >>>>>>>> time any of the other epochs are bumped since they all trigger >> a >> > >>>>>>>> reassignment) >> > >>>>>>>> >> > >>>>>>>> S14. The KIP also mentions a new concept of an "assignment >> epoch" >> > >>>> but >> > >>>>>>>> doesn't seem to ever elaborate on how this is defined and what >> > it's >> > >>>> used >> > >>>>>>>> for. I assume it's bumped each time the assignment changes and >> > >>>>>> represents a >> > >>>>>>>> sort of "assignment version", is that right? Can you describe >> in >> > >>>> more >> > >>>>>>>> detail the difference between the group epoch and the >> assignment >> > >>>> epoch? >> > >>>>>>>> >> > >>>>>>>> S15. I assume the "InstanceId" field in the HB request >> corresponds >> > >>>> to >> > >>>>>> the >> > >>>>>>>> group.instance.id config of static membership. I always felt >> this >> > >>>> field >> > >>>>>>>> name was too generic and easy to get mixed up with other >> > >>>> identifiers, >> > >>>>>> WDYT >> > >>>>>>>> about "StaticId" or "StaticInstanceId" to make the static >> > membership >> > >>>>>>>> relation more explicit? >> > >>>>>>>> >> > >>>>>>>> S16. what is the "flexibleVersions" field in the RPC >> protocols? I >> > >>>> assume >> > >>>>>>>> this is related to versioning compatibility but it's not quite >> > clear >> > >>>>>> from >> > >>>>>>>> the KIP how this is to be used. For example if we modify the >> > >>>> protocol by >> > >>>>>>>> adding new fields at the end, we'd bump the version but should >> the >> > >>>>>>>> flexibleVersions field change as well? >> > >>>>>>>> >> > >>>>>>>> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy >> > >>>>>>>> <lbruts...@confluent.io.invalid> wrote: >> > >>>>>>>> >> > >>>>>>>>> Hi Sophie, >> > >>>>>>>>> >> > >>>>>>>>> Thanks for your detailed comments - much appreciated! I think >> you >> > >>>> read >> > >>>>>>>>> a version of the KIP that did not yet include the admin >> > >>>> command-line >> > >>>>>>>>> tool and the Admin API extensions, so some of the comments are >> > >>>> already >> > >>>>>>>>> addressed in the KIP. >> > >>>>>>>>> >> > >>>>>>>>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize are >> called >> > in >> > >>>> the >> > >>>>>>>>> consumer background thread. Note that in the new consumer >> > threading >> > >>>>>>>>> model, all RPCs are run by the background thread. Check out >> this: >> > >>>>>>>>> >> > >>>>>>>>> >> > >>>>>> >> > >>>> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design >> > >>>>>>>>> for more information. Both our RPCs are just part of the group >> > >>>>>>>>> membership management and do not need to be invoked >> explicitly by >> > >>>> the >> > >>>>>>>>> application thread. You could imagine calling the initialize >> RPC >> > >>>>>>>>> explicitly (to implement explicit initialization), but this >> would >> > >>>>>>>>> still mean sending an event to the background thread, and the >> > >>>>>>>>> background thread in turn invokes the RPC. However, explicit >> > >>>>>>>>> initialization would require some additional public interfaces >> > >>>> that we >> > >>>>>>>>> are not including in this KIP. StreamsGroupDescribe is called >> by >> > >>>> the >> > >>>>>>>>> AdminClient, and used by the command-line tool >> > >>>>>>>>> kafka-streams-groups.sh. >> > >>>>>>>>> >> > >>>>>>>>> S2. I think the max.warmup.replicas=100 suggested by Nick was >> > >>>> intended >> > >>>>>>>>> as the upper limit for setting the group configuration on the >> > >>>> broker. >> > >>>>>>>>> Just to make sure that this was not a misunderstanding. By >> > default, >> > >>>>>>>>> values above 100 should be rejected when setting a specific >> value >> > >>>> for >> > >>>>>>>>> group. Are you suggesting 20 or 30 for the default value for >> > >>>> groups, >> > >>>>>>>>> or the default upper limit for the group configuration? >> > >>>>>>>>> >> > >>>>>>>>> S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. >> The >> > >>>>>>>>> MemberEpoch=-1 is a left-over from an earlier discussion. It >> > means >> > >>>>>>>>> that the member is leaving the group, so the intention was >> that >> > the >> > >>>>>>>>> member must leave the group when it asks the other members to >> > shut >> > >>>>>>>>> down. We later reconsidered this and decided that all >> > applications >> > >>>>>>>>> should just react to the shutdown application signal that is >> > >>>> returned >> > >>>>>>>>> by the broker, so the client first sets the >> ShutdownApplication >> > and >> > >>>>>>>>> later leaves the group. Thanks for spotting this, I removed >> it. >> > >>>>>>>>> >> > >>>>>>>>> S4. Not sure if this refers to the latest version of the KIP. >> We >> > >>>> added >> > >>>>>>>>> an extension of the admin API and a kafka-streams-groups.sh >> > >>>>>>>>> command-line tool to the KIP already. >> > >>>>>>>>> >> > >>>>>>>>> S5. All RPCs for dealing with offsets will keep working with >> > >>>> streams >> > >>>>>>>>> groups. The extension of the admin API is rather cosmetic, >> since >> > >>>> the >> > >>>>>>>>> method names use "consumer group". The RPCs, however, are >> generic >> > >>>> and >> > >>>>>>>>> do not need to be changed. >> > >>>>>>>>> >> > >>>>>>>>> S6. Yes, you can use the DeleteGroup RPC with any group ID, >> > whether >> > >>>>>>>>> streams group or not. >> > >>>>>>>>> >> > >>>>>>>>> S7. See the admin API section. >> > >>>>>>>>> >> > >>>>>>>>> S8. I guess for both A and B, I am not sure what you are >> > >>>> suggesting. >> > >>>>>>>>> Do you want to make the broker-side topology immutable and not >> > >>>> include >> > >>>>>>>>> any information about the topology, like the topology ID in >> the >> > >>>> RPC? >> > >>>>>>>>> It would seem that this would be a massive food-gun for >> people, >> > if >> > >>>>>>>>> they start changing their topology and don't notice that the >> > >>>> broker is >> > >>>>>>>>> looking at a completely different version of the topology. Or >> did >> > >>>> you >> > >>>>>>>>> mean that there is some kind of topology ID, so that at least >> we >> > >>>> can >> > >>>>>>>>> detect inconsistencies between broker and client-side >> topologies, >> > >>>> and >> > >>>>>>>>> we fence out any member with an incorrect topology ID? Then we >> > >>>> seem to >> > >>>>>>>>> end up with mostly the same RPCs and the same questions (how >> is >> > the >> > >>>>>>>>> topology ID generated?). I agree that the latter could be an >> > >>>> option. >> > >>>>>>>>> See summary at the end of this message. >> > >>>>>>>>> >> > >>>>>>>>> S9. If we flat out refuse topology updates, agree - we cannot >> let >> > >>>> the >> > >>>>>>>>> application crash on minor changes to the topology. However, >> if >> > we >> > >>>>>>>>> allow topology updates as described in the KIP, there are only >> > >>>> upsides >> > >>>>>>>>> to making the topology ID more sensitive. All it will cause is >> > >>>> that a >> > >>>>>>>>> client will have to resend a `StreamsGroupInitialize`, and >> during >> > >>>> the >> > >>>>>>>>> rolling bounce, older clients will not get tasks assigned. >> > >>>>>>>>> >> > >>>>>>>>> S10. The intention in the KIP is really just this - old >> clients >> > can >> > >>>>>>>>> only retain tasks, but not get new ones. If the topology has >> > >>>>>>>>> sub-topologies, these will initially be assigned to the new >> > >>>> clients, >> > >>>>>>>>> which also have the new topology. You are right that rolling >> an >> > >>>>>>>>> application where the old structure and the new structure are >> > >>>>>>>>> incompatible (e.g. different subtopologies access the same >> > >>>> partition) >> > >>>>>>>>> will cause problems. But this will also cause problems in the >> > >>>> current >> > >>>>>>>>> protocol, so I'm not sure, if it's strictly a regression, it's >> > just >> > >>>>>>>>> unsupported (which we can only make the best effort to >> detect). >> > >>>>>>>>> >> > >>>>>>>>> In summary, for the topology ID discussion, I mainly see two >> > >>>> options: >> > >>>>>>>>> 1) stick with the current KIP proposal >> > >>>>>>>>> 2) define the topology ID as the hash of the >> > StreamsGroupInitialize >> > >>>>>>>>> topology metadata only, as you suggested, and fence out >> members >> > >>>> with >> > >>>>>>>>> an incompatible topology ID. >> > >>>>>>>>> I think doing 2 is also a good option, but in the end it comes >> > >>>> down to >> > >>>>>>>>> how important we believe topology updates (where the set of >> > >>>>>>>>> sub-topologies or set of internal topics are affected) to be. >> The >> > >>>> main >> > >>>>>>>>> goal is to make the new protocol a drop-in replacement for the >> > old >> > >>>>>>>>> protocol, and at least define the RPCs in a way that we won't >> > need >> > >>>>>>>>> another KIP which bumps the RPC version to make the protocol >> > >>>>>>>>> production-ready. Adding more command-line tools, or public >> > >>>> interfaces >> > >>>>>>>>> seems less critical as it doesn't change the protocol and can >> be >> > >>>> done >> > >>>>>>>>> easily later on. The main question becomes - is the protocol >> > >>>>>>>>> production-ready and sufficient to replace the classic >> protocol >> > if >> > >>>> we >> > >>>>>>>>> go with 2? >> > >>>>>>>>> >> > >>>>>>>>> Cheers, >> > >>>>>>>>> Lucas >> > >>>>>>>>> >> > >>>>>> >> > >>>> >> > >>>> >> > >>> >> > >> >> > >> > >> >