Thanks for the updates! Few minor questions before we wrap this up and move to a vote:
S1. Can you clarify the outward-facing behavior of a group that enters the NOT_READY state due to, say, missing source topics? It sounds like the application will continue to run without processing anything, which would be a change compared to today where we shut down with an explicit error. Am I reading this correctly and is this change intentional? S2. Regarding topology upgrades, I am fully in favor of Bruno's proposal. Just feel like we need to flesh it out a bit. For example you say "no specific tooling" is needed, but how would a user go about upgrading the topology? If we can just describe this process in a bit more detail then I would feel good. S3. Lastly, just a quick reminder, we should make sure to clarify in the KIP how the topology ID will be computed since that impacts which kind of upgrades are possible and is therefore part of the public API. IIUC the plan is to compute it from the topics which makes sense to me (though I do think it should eventually be pluggable, perhaps in a followup KIP) On Thu, Nov 7, 2024 at 10:08 AM Bruno Cadonna <cado...@apache.org> wrote: > Hi Lucas and all others, > > Thanks for the proposals regarding the topology upgrades! > > I have an additional proposal: > > We introduce a topology epoch alongside the topology ID. The topology > epoch is set to 0 on the group coordinator for a new group. The topology > epoch is also a config within the Streams client, the default is 1. > > When the Streams client starts up, it sends its topology epoch alongside > the topology ID and the topology to the group coordinator. The group > coordinator initializes the topology with the given topology ID and > increases the topology epoch to 1. > > When the client wants to upgrade a topology it needs to send a valid > topology epoch alongside the new topology ID and the topology. We can > decide if the new topology epoch = current topology at group coordinator > + 1 or just new topology epoch > current topology on group coordinator. > > When the group coordinator receives a topology ID that differs from the > one intialized for the group, the group coordinator only upgrades to the > new topology if the topology epoch is valid. Then, it sets the topology > epoch to the new value. If the topology epoch is not valid, that means > the topology ID is an old one and the group coordinator does not > re-initialize the topology. > > The current topology epoch can be retrieved with the describe RPC. > > Accidental rollbacks of topologies are not be possible. Only wanted > rollbacks of topologies with an appropriate topology epoch are allowed. > > Specific tooling is not needed. > > The topology upgrade can be automated in CI in the following way: > > 1. Call streams group describe to determine the current epoch. > > 2. Set the Streams client config topology.epoch = current epoch + 1 > > 3. Roll the application. > > WDYT? > > Best, > Bruno > > > On 07.11.24 18:26, Lucas Brutschy wrote: > > Hi all, > > > > I have updated the KIP with some details around how we handle the > > cases when essential topics required by the topology are not present. > > This is described in a new section "Handling topic topology > > mismatches". The short summary is that we enter a state where the > > group member's heartbeats will succeed, but no assignments will be > > made, and the nature of the topic misconfiguration is communicated to > > the members using the `status` field of the heartbeat response. > > > > Working towards putting up this KIP up for a vote, there was one last > > concern mentioned during our last sync, which was a concern raised by > > Sophie and others around topology updating. I would like to propose > > the following changes to the KIP to solidify the story around topology > > updating. > > > > To recap, a member sends its topology metadata every time it attempts > > to join a streams group. When the group already has a topology set, > > the KIP proposes to update the topology for the group immediately with > > that first heartbeat of a member. All other members will then receive > > a non-fatal group status INCONSISTENT_TOPOLOGY, letting them know that > > their topology is not equal to the groups topology anymore. Those > > members with an outdated topology will be able to retain their current > > tasks, but will not receive new tasks, until they have also updated > > their topology. This enables a rolling bounce to upgrade the topology. > > > > There is a concern with this behavior, raised both in our discussions > > but also by Guozhang and Sophie. The problem is that a “misbehaving” > > member can always roll back a topology upgrade, if it is still running > > with the old version of the topology. For example, if we roll 20 > > members to a new topology version, and we have successfully restarted > > 19 out of 20, but the last member is restarted with the old topology, > > this will cause the topology version on the broker to be rolled back. > > This could happen, for example, because topology update fails on the > > last member, or the last member runs into a fatal error of some kind > > (out-of-memory or being fenced, for example). Once the failed member > > rejoins, it updates the topology on the broker to the “old” topology, > > so performs an involuntary roll-back. Now, we would be in a situation > > where 19 out of 20 members are blocked from getting new tasks > > assigned. We would be in this state just temporary, until the final, > > misbehaving member is restarted correctly with the new topology. > > > > The original suggestion in the KIP discussion to solve these issues > > (for now) was to exclude topology updates from the KIP. However, it > > was confirmed by several people (such as Anna) that topology updates > > are quite common in the wild, so for having a production-ready > > implementation of the new protocol, a mechanism for topology updates > > is required. It is not clear how strict the requirement for rolling > > updates in the first version of the protocol. No ‘better’ alternative > > for updating topologies was suggested. > > > > Proposal 1) Making topology updating optional: We modify the heartbeat > > request to allow joining a group without sending a topology (only a > > topology ID). When joining a group this way, the member is explicitly > > requesting to not update the topology of the group. In other words, > > leaving the topology as null, will mean that no topology update will > > be performed on the broker side. If the topology ID does not match the > > topology ID of the group, and no topology update is performed, and the > > member that joined will just behave like any other member with an > > inconsistent topology ID. That is, it will not get any tasks assigned. > > Streams clients will never send the topology when they know they are > > just rejoining the group, e.g. because the client was fenced and > > dropped out of the group. That means, a topology update is only > > initiated when a streams application is first started up. This will > > prevent "accidental" topology downgrades if we know that we are > > rejoining the group. However, if we rejoin the group without knowing > > that we are just rejoining (e.g. since the whole pod was restarted), > > we could still run into temporary topology downgrades. > > > > Proposal 2) Disable topology updates by default on the client: > > Second, we can disable topology updates by default from the client > > side, by extending the streams configuration with a config > > enable_topology_updates which defaults to false and extending the > > heartbeat RPC by a flag UpdateTopology which propagates that config > > value to the broker. The advantage of having a client-side config is, > > that, during a roll, I can set the configuration only for updated > > clients, which can ensure that no accidental rollbacks happen. The > > downside is that after the roll, I would have to do a second roll to > > disable enable_topology_updates again, so that we are safe during the > > next roll again. However, avoiding the second roll is more of a > > usability optimization, which is conceptually more complex and could > > address in a follow-up KIP, as in proposal 3. > > > > Proposal 3) Fixing the topology ID during upgrades (follow-up KIP): To > > fully prevent accidental downgrades, one could introduce a broker-side > > group-level config that restricts the possible topology IDs that a > > client can upgrade to. This would be quite similar in safety as making > > topology updates manual. For updating a topology, the user would have > > to first, extract the topology ID from the update topology - this > > could e.g. be logged when the application is started, second, set the > > group-level config using an admin client (something like > > group.streams.allowed_upgrade_topology_id), third, roll the > > application. This would require quite some extra tooling (how to get > > the topology ID), so I’d propose introducing this in a follow-up KIP. > > > > Let me know if there are any concerns with this approach. > > > > Cheers, > > Lucas > > > > On Mon, Oct 7, 2024 at 9:42 AM Bruno Cadonna <cado...@apache.org> wrote: > >> > >> Hi all, > >> > >> we did some major changes to this KIP that we would like the community > >> to review. The changes are the following: > >> > >> 1. We merged the Streams group initialize RPC into the Streams group > >> heartbeat RPC. We decided to merge them because it simplifies the > >> sychronization between initialization and heartbeat during group > >> creation and migration from a classic group. A consequence of the merge > >> is that the heartbeat now needs permissions to create and describe > >> topics which before only the initialize call had. But we think that is > >> not a big deal. A consumer of a Streams client will send the metadata of > >> its topology in its first heartbeat when it joins the group. After that > >> it will not send the topology metadata anymore. > >> > >> 2. We discovered that to create the internal topics, the topology > >> metadata also need to include the co-partition groups since we cannot > >> infer the co-partitioning solely from the sub-topologies and the input > >> topics of the sub-topologies. > >> > >> 3. We added a section that describes how the migration from a classic > >> group to a streams group and back works. This is very similar to how the > >> migration in KIP-848 works. > >> > >> > >> Please give us feedback about the changes and the KIP in general. > >> > >> We would like to start a vote on this KIP as soon as possible. > >> > >> Best, > >> Bruno > >> > >> On 11.09.24 01:06, Sophie Blee-Goldman wrote: > >>> Just following up to say thank you Lucas for the detailed > explanations, and > >>> especially the thorough response to my more "existential" questions > about > >>> building off 848 vs introducing a separate Streams group protocol. This > >>> really helped me understand the motivations behind this decision. No > notes! > >>> > >>> On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy > >>> <lbruts...@confluent.io.invalid> wrote: > >>> > >>>> Hi Sophie, > >>>> > >>>> thanks for getting deeply involved in this discussion. > >>>> > >>>> S16. Adding tagged fields would not require an RPC version bump at > >>>> all. This should already a cover a lot of use cases where requestion > >>>> versioning wouldn't become an issue, as long as it is safe to ignore > >>>> the new fields. Other than that, I'm not aware of a formal definition > >>>> of the what request version bumps mean in relation to Apache Kafka > >>>> versions - if anybody knows, please let me know. In my understanding, > >>>> the Kafka protocol evolves independently of AK versions, and for the > >>>> Kafka protocol to change, you require an accepted KIP, and not an AK > >>>> release. The grey area is that a protocol change without an AK release > >>>> does not mean much, because even if cloud vendors deploy it before an > >>>> AK release, virtually no clients would support it and the protocol > >>>> would fall back to the earlier RPC version. So maybe this isn't that > >>>> clearly formalized because it does not matter much in practice. > >>>> Normally, I would expect an RPC version bump after each KIP that > >>>> breaks RPC compatibility. But there seems to be some flexibility here > >>>> - for example, the KIP-848 RPCs were changed after the fact in trunk, > >>>> because they were not exposed yet by default in AK (or any other > >>>> implementation of the Kafka protocol), so a breaking change without > >>>> request version bump was probably not deemed problematic. > >>>> > >>>> S17/18. We can add an informational table for the interface to the > >>>> assignor, and that should also make clear how a client-side assignment > >>>> interface would look. Generally, our idea would be to send all the > >>>> input information for the task assignor to a client and expect all the > >>>> output information of a task assignor in return, very similar to > >>>> KIP-848. We did not include such a table because we do not define a > >>>> public interface to define custom broker-side assignors in this KIP. > >>>> But generally, the input / output of the (task) assignor mostly > >>>> follows the data fields sent to / from the brokers. Let me do that, > >>>> but first I'll respond to the rest of your questions that seem more > >>>> important. > >>>> > >>>> S19. I think an explicit trigger for rebalance could make sense, but > >>>> let me explain the thinking why we haven't included this in the > >>>> current KIP: We don't want to include it for client-side assignment > >>>> obviously, because this KIP does not include client-side assignment. > >>>> For triggering a rebalance when a warm-up has caught up, this is > >>>> treated differently in the KIP: the task assignor is called when a > >>>> client sends updated task offsets. The client does not update its task > >>>> offsets regularly, but according to an interval > >>>> `task.offset.interval.ms`, but also immediately when a client notices > >>>> that a warm-up has caught up. So effectively, it is mostly the client > >>>> that is constantly checking the lag. I absolutely believe that the > >>>> client should be checking for `acceptable.recovery.lag`, and we are > >>>> using the presence of the task offset field to trigger reassignment. > >>>> What we weren't sure is how often we need the task offsets to be > >>>> updated and how often we want to retrigger the assignment because of > >>>> that. An explicit field for "forcing" a reassignment may still make > >>>> sense, so that we can update the task offsets on the broker more often > >>>> than we call the assignor. However, I'm not convinced that this should > >>>> just be a generic "rebalance trigger", as this seems to be going > >>>> against the goals of KIP-848. Generally, I think one of the principles > >>>> of KIP-848 is that a member just describes its current state and > >>>> metadata, and the group coordinator makes a decision on whether > >>>> reassignment needs to be triggered. So it would seem more natural to > >>>> just indicate a boolean "I have caught-up warm-up tasks" to the group > >>>> coordinator. I understand that for client-side assignment, you may > >>>> want to trigger assignment explicitely without the group coordinator > >>>> understanding why, but then we should add such a field as part of the > >>>> client-side assignment KIP. > >>>> > >>>> S20. Yes, inside the group coordinator, the main differences are > >>>> topology initialization handling, and the semantics of task assignment > >>>> vs. partition assignment. It will not be a completely separate group > >>>> coordinator, and we will not change things like the offset reading and > >>>> writing. What will have its own code path is the handling of > >>>> heartbeats and the reconciliation of the group (so, which tasks are > >>>> sent to which member at which point in time until we reach the > >>>> declarative target assignment that was produced by a client-side or > >>>> server-side assignor). Also, the assignor implementations will > >>>> obviously be completely separate from the consumer group assignors, > >>>> but that is already the case currently. Streams groups will co-exist > >>>> in the group coordinator with share groups (for queues), classic > >>>> groups (old generic protocol for consumers and connect) and consumer > >>>> groups (from KIP-848), and potentially a new group type for connect as > >>>> well. Each of these group types have a separate code path for heart > >>>> beating, reconciling assignments, keeping track of the internal state > >>>> of the group and responding to specific requests from admin API, like > >>>> ConsumerGroupDescribe. For streams, there will be a certain amount of > >>>> duplication from consumer groups - for example, liveness tracking for > >>>> members (setting timers for the session timeout etc.) is essentially > >>>> the same between both groups, and the reconciliation logic is quite > >>>> similar. We will make an effort to refactor as much of this code in a > >>>> way that it can be reused by both group types, and a lot of things > >>>> have already been made generic with the introduction of share groups > >>>> in the group coordinator. > >>>> > >>>> S21. It's nice that we are discussing "why are we doing this" in > >>>> question 21 :D. I think we are touching upon the reasoning in the KIP, > >>>> but let me explain the reasoning in more detail. You are right, the > >>>> reason we want to implement custom RPCs for Kafka Streams is not about > >>>> broker-side vs. client-side assignment. The broker could implement the > >>>> Member Metadata versioning scheme of Kafka Streams, intercept consumer > >>>> group heartbeats that look like Kafka Streams heartbeats, deserialize > >>>> the custom metadata and run a broker-side assignor for Kafka Streams, > >>>> and finally serialize custom Kafka Streams assignment metadata, all > >>>> based on KIP-848 RPCs. > >>>> > >>>> First of all, the main difference we see with custom RPCs is really > >>>> that streams groups become a first-level concept for both the broker > >>>> and clients of the Kafka. This also goes into the answer for S20 (what > >>>> is different beside topology initialization and task reconciliation). > >>>> With streams groups, I can use the Admin API call to see the current > >>>> assignment, the target assignment, all the metadata necessary for > >>>> assignment, and some information about the current topology in a > >>>> human-readable and machine-readable form, instead of having to parse > >>>> it from logs on some client machine that may or may not have the right > >>>> log-level configured. This will help humans to understand what is > >>>> going on, but can also power automated tools / CLIs / UIs, and I think > >>>> this is a major improvement for operating a streams application. > >>>> Again, this is not about who is in control of the broker or the > >>>> clients, the information becomes accessible for everybody who can call > >>>> the Admin API. It also does not depend on the cloud vendor, it will be > >>>> an improvement available for pure AK users, and will also work with > >>>> client-side assignment (although obviously, if there is an intention > >>>> to start encoding proprietary data in black-box byte-arrays, this > >>>> information remains hidden to users of the open source distribution > >>>> again). > >>>> > >>>> I also think that the original proposal of reusing the new consumer > >>>> group protocol for Kafka Streams has some downsides, that, while not > >>>> being deal-breakers, would make the new behaviour of Kafka Streams > >>>> less than ideal. One example is the reconciliation: if we have a > >>>> custom stream partition assignor (client- or broker-side doesn't > >>>> matter), we can only control the target assignment that the group > >>>> converges to, but none of the intermediate states that the group > >>>> transitions through while attempting to reach the desired end state. > >>>> For example, there is no way to prevent the reconciliation loop of > >>>> KIP-848 from giving an active task to processX before another instance > >>>> on processX has revoked the corresponding standby task. There is no > >>>> synchronization point where these movements are done at the same time > >>>> as in the classic protocol, and on the broker we can't prevent this, > >>>> because the reconciliation loop in KIP-848 does not know what a > >>>> standby task is, or what a process ID is. You could argue, you can let > >>>> the application run into LockExceptions in the meantime, but it will > >>>> be one of the things that will make Kafka Streams work less than > >>>> ideal. Another example is task offsets. It is beneficial for streams > >>>> partition assignors to have relatively recent task offset information, > >>>> which, in KIP-848, needs to be sent to the assignor using a black-box > >>>> MemberMetadata byte array. This member metadata byte array is > >>>> persisted to the consumer offset topic every time it changes (for > >>>> failover). So we end up with a bad trade-off: if we send the task > >>>> offset information to the broker very frequently, we will have to > >>>> write those member metadata records very frequently as well. If we do > >>>> not send them frequently, streams task assignors will have very > >>>> outdated lag information. In KIP-1071 we can make the decision to not > >>>> persist the task offset information, which avoids this trade-off. It > >>>> gives us up-to-date lag information in the usual case, but after > >>>> fail-over of the group coordinator, our lag information may be missing > >>>> for a short period of time. These are two examples, but I think the > >>>> general theme is, that with custom RPCs, Kafka Streams is in control > >>>> of how the group metadata is processed and how the tasks are > >>>> reconciled. This is not just a consideration for now, but also the > >>>> future: will KIP-848 reconciliation and metadata management always > >>>> work "good enough" for Kafka Streams, or will there be use cases where > >>>> we want to evolve and customize the behaviour of Kafka Streams groups > >>>> independently of consumer groups. > >>>> > >>>> A final point that I would like to make, I don't think that custom > >>>> streams RPCs are inherently a lot harder to do for the Kafka Streams > >>>> community than "just utilizing" KIP-848, at least not as much as you > >>>> would think. Many things in KIP-848, such customizable assignment > >>>> metadata with a metadata versioning scheme are made to look like > >>>> general-purpose mechanisms, but are effectively mechanisms for Kafka > >>>> Streams and also completely unimplemented at this point in any > >>>> implementation of the Kafka protocol. We know that there is very > >>>> little use of custom partition assignors in Kafka besides Kafka > >>>> Streams, and we can expect the same to be true of these new > >>>> "customizable" features of KIP-848. So, besides the Kafka Streams > >>>> community, there is no one driving the implementation of these > >>>> features. "Just utilizing" KIP-848 for Kafka Streams would > >>>> effectively mean to implement all this, and still make a lot of deep > >>>> changes to Kafka Streams (as mechanisms like probing rebalances etc. > >>>> don't make sense in KIP-848). I think adapting KIP-848 to work with > >>>> Kafka Streams either way is a big effort. Piggybacking on consumer > >>>> group RPCs and extending the consumer group protocol to the point that > >>>> it can carry Kafka Streams on its back would not make things > >>>> significantly easier. Some things, like online migration from the old > >>>> protocol to the new protocol or reimplementing metadata versioning on > >>>> top of KIP-848, may even be harder in KIP-848, than in KIP-1071. I > >>>> don't think we can consider consumer group RPCs as a shortcut to > >>>> revamp the streams rebalance protocol, but should be focussing on what > >>>> makes Kafka Streams work best down the road. > >>>> > >>>> Hope that makes sense! > >>>> > >>>> Cheers, > >>>> Lucas > >>>> > >>>> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman > >>>> <sop...@responsive.dev> wrote: > >>>>> > >>>>> Thanks for another detailed response Lucas! Especially w.r.t how the > >>>> epochs > >>>>> are defined. I also went back and re-read KIP-848 to refresh myself, > I > >>>>> guess it wasn't clear to me how much of the next-gen consumer > protocol we > >>>>> are reusing vs what's being built from the ground up. > >>>>> > >>>>> S16. > >>>>> > >>>>>> It will become interesting when we want to include extra data > after > >>>> the > >>>>>> initial protocol definition. > >>>>> > >>>>> I guess that's what I'm asking us to define -- how do we evolve the > >>>> schema > >>>>> when expanding the protocol? I think we need to set a few public > >>>> contracts > >>>>> here, such as whether/which versions get bumped when. For example if > I > >>>>> add/modify fields twice within a single Kafka release, do we still > need > >>>> to > >>>>> update the version? I think the answer is "yes" since I know you > >>>> Confluent > >>>>> folks upgrade brokers more often than AK releases, but this is > different > >>>>> from how eg the SubscriptionInfo/AssignmentInfo of the > >>>>> StreamsPartitionAssignor works today, hence I want to put all that in > >>>>> writing (for those of us who aren't familiar with how all this works > >>>>> already and may be modifying it in the future, say, to add > client-side > >>>>> assignment :P) > >>>>> > >>>>> Maybe a quick example of what to do to evolve this schema is > sufficient? > >>>>> > >>>>> S17. > >>>>> The KIP seems to throw the assignor, group coordinator, topic > >>>>> initialization, and topology versioning all into a single black box > on > >>>> the > >>>>> broker. I understand much of this is intentionally being glossed > over as > >>>> an > >>>>> implementation detail since it's a KIP, but it would really help to > tease > >>>>> apart these distinct players and define their interactions. For > example, > >>>>> what are the inputs and outputs to the assignor? For example could we > >>>> get a > >>>>> table like the "Data Model > >>>>> < > >>>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel > >>>>> " > >>>>> in 848? > >>>>> > >>>>> KIP-848 goes into detail on these things, but again, it's unclear > what > >>>>> parts of 848 are and aren't supposed to be carried through. For > example > >>>> 848 > >>>>> has a concept of assignor "reasons" and client-side assignors and > >>>> assignor > >>>>> metadata which can all trigger a rebalance/group epoch bump. However > in > >>>> the > >>>>> current plan for 1071 none of these exist. Which leads me to my next > >>>>> point... > >>>>> > >>>>> S18. > >>>>> Many people use a managed Kafka service like Confluent Cloud where > >>>> brokers > >>>>> are on the bleeding edge but clients are upgraded less frequently. > >>>> However, > >>>>> the opposite is also true for a great many users: client > applications are > >>>>> easy to upgrade and bumped often to get new features, whereas the > brokers > >>>>> (often run by a different team altogether) are considered riskier to > >>>>> upgrade and stay on one version for a long time. We can argue about > which > >>>>> case is more common -- in my personal experience it's actually the > >>>> latter, > >>>>> although I only realized this after my time at Confluent which was > >>>>> obviously skewed towards meeting Confluent Cloud users -- but > regardless, > >>>>> we obviously need to support both cases as first-class citizens. And > I'm > >>>> a > >>>>> bit concerned that there's not enough in the KIP to ensure that > >>>> client-side > >>>>> assignment won't end up getting hacked into the protocol as an > >>>> afterthought. > >>>>> > >>>>> Don't get me wrong, I'm still not demanding you guys implement > >>>> client-side > >>>>> assignors as part of this KIP. I'm happy to leave the implementation > of > >>>>> that to a followup, but I feel we need to flesh out the basic > direction > >>>> for > >>>>> this right now, as part of the v0 Streams protocol. > >>>>> > >>>>> I'm also not asking you to do this alone, and am happy to work > with/for > >>>> you > >>>>> on this. I have one possible proposal sketched out already, although > >>>> before > >>>>> I share that I just wanted to make sure we're aligned on the > fundamentals > >>>>> and have mapped out the protocol in full first (ie S17). Just want to > >>>> have > >>>>> a rough plan for how clients-side assignment will fit into the > picture, > >>>> and > >>>>> right now it's difficult to define because it's unclear where the > >>>>> boundaries between the group coordinator and broker-side assignor > are (eg > >>>>> what are assignor inputs and outputs). > >>>>> > >>>>> I'm committed to ensuring this doesn't add extra work or delay to > this > >>>> KIP > >>>>> for you guys. > >>>>> > >>>>> S19. > >>>>> I think it could make sense to include a concept like the "assignor > >>>>> reason" from 848 as a top-level field and rebalance trigger (ie > group > >>>>> epoch bump). This would mainly be for the client-side assignor to > >>>> trigger a > >>>>> rebalance, but for another example, it seems like we want the > brokers to > >>>> be > >>>>> constantly monitoring warmup tasks and processing lag updates to know > >>>> when > >>>>> a warmup task is to be switched, which is heavy work. Why not just > let > >>>> the > >>>>> client who owns the warmup and knows the lag decide when it has > warmed up > >>>>> and signal the broker? It's a small thing for a client to check when > its > >>>>> task gets within the acceptable recovery lag and notify via the > heartbeat > >>>>> if so, but it seems like a rather big deal for the broker to be > >>>> constantly > >>>>> computing these checks for every task on every hb. Just food for > thought, > >>>>> I don't really have a strong preference over where the warmup > decision > >>>>> happens -- simply pointing it out that it fits very neatly into the > >>>>> "assignor reason"/"rebalance requested" framework. Making this a > >>>>> client-side decision also makes it easier to evolve and customize > since > >>>>> Kafka Streams users tend to have more control over > upgrading/implementing > >>>>> stuff in their client application vs the brokers (which is often a > >>>>> different team altogether) > >>>>> > >>>>> S20. > >>>>> To what extent do we plan to reuse implementations of 848 in order to > >>>> build > >>>>> the new Streams group protocol? I'm still having some trouble > >>>> understanding > >>>>> where we are just adding stuff on top and where we plan to rebuild > from > >>>> the > >>>>> ground up. For example we seem to share most of the epoch > reconciliation > >>>>> stuff but will have different triggers for the group epoch bump. Is > it > >>>>> possible to just "plug in" the group epoch and let the consumer's > group > >>>>> coordinator figure stuff out from there? Will we have a completely > >>>> distinct > >>>>> Streams coordinator? I know this is an implementation detail but imo > it's > >>>>> ok to get in the weeds a bit for a large and complex KIP such as > this. > >>>>> > >>>>> Setting aside the topology initialization stuff, is the main > difference > >>>>> really just in how the group coordinator will be aware of tasks for > >>>> Streams > >>>>> groups vs partitions for consumer groups? > >>>>> > >>>>> Which brings me to a high level question I feel I have to ask: > >>>>> > >>>>> S21. > >>>>> Can you go into more detail on why we have to provide Kafka Streams > its > >>>> own > >>>>> RPCs as opposed to just utilizing the protocol from KIP-848? The > Rejected > >>>>> Alternatives does address this but the main argument there seems to > be in > >>>>> implementing a broker-side assignor. But can't you do this with 848? > And > >>>>> given there are users who rarely upgrade their brokers just as there > are > >>>>> users who rarely upgrade their clients, it seems silly to do all this > >>>> work > >>>>> and change the fundamental direction of Kafka Streams just to give > more > >>>>> control over the assignment to the brokers. > >>>>> > >>>>> For a specific example, the KIP says "Client-side assignment makes it > >>>> hard > >>>>> to debug and tune the assignment logic" -- personally, I would find > it > >>>>> infinitely more difficult to tune and debug assignment logic if it > were > >>>>> happening on the broker. But that's just because I work with the app > >>>> devs, > >>>>> not the cluster operators. The point is we shouldn't design an entire > >>>>> protocol to give preference to one vendor or another -- nobody wants > the > >>>>> ASF to start yelling at us lol D: > >>>>> > >>>>> Another example, which imo is a good one (IIUC) is that "a simple > >>>> parameter > >>>>> change requires redeploying of all streams clients". That's > definitely a > >>>>> huge bummer, but again, why would broker-side assignors not be able > to > >>>>> trigger a reassignment based on a custom config in the 848 protocol? > We > >>>>> could still have the ha and sticky assignors implemented on the > brokers, > >>>>> and could still define all the Streams configs as broker configs to > be > >>>>> passed into the streams custom broker assignors. That feels like a > lot > >>>> less > >>>>> work than redoing our own group protocol from scratch? > >>>>> > >>>>> I don't want you guys to think I'm trying to block the entire > direction > >>>> of > >>>>> this KIP. I'd just like to better understand your thoughts on this > since > >>>> I > >>>>> assume you've discussed this at length internally. Just help me > >>>> understand > >>>>> why this huge proposal is justified (for my own sake and to convince > any > >>>>> "others" who might be skeptical) > >>>>> > >>>>> > >>>>> On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org> > wrote: > >>>>> > >>>>>> I still need to catch up on the discussion in general. But as > promised, > >>>>>> I just started KIP-1088 about the `KafkaClientSupplier` question. > >>>>>> > >>>>>> Looking forward to your feedback on the new KIP. I hope we can get > >>>>>> KIP-1088 done soon, to not block this KIP. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 9/4/24 09:52, Lucas Brutschy wrote: > >>>>>>> Hi Sophie, > >>>>>>> > >>>>>>> thanks for the questions and comments! > >>>>>>> > >>>>>>> S1. KIP-1071 does not need to add public interfaces to streams, but > >>>> it > >>>>>>> does not (yet) make the streams protocol client pluggable. Matthias > >>>>>>> promised to write a KIP that will propose a solution for the > >>>>>>> KafkaClientSupplier soon. He suggested keeping it separate from > this > >>>>>>> KIP, but we can treat it as a blocker. You are right that we could > >>>> try > >>>>>>> fitting an explicit topic initialization in this interface, if we > >>>>>>> decide we absolutely need it now, although I'm not sure if that > would > >>>>>>> actually define a useful workflow for the users to explicitly > update > >>>> a > >>>>>>> topology. Explicit initialization cannot be part of admin tools, > >>>>>>> because we do not have the topology there. It could probably be a > >>>>>>> config that defaults to off, or a method on the KafkaStreams > object? > >>>>>>> To me, explicit initialization opens a broad discussion on how to > do > >>>>>>> it, so I would like to avoid including it in this KIP. I guess it > >>>> also > >>>>>>> depends on the outcome of the discussion in S8-S10. > >>>>>>> > >>>>>>> S2. It's hard to define the perfect default, and we should probably > >>>>>>> aim for safe operations first - since people can always up the > limit. > >>>>>>> I changed it to 20. > >>>>>>> > >>>>>>> S11. I was going to write we can add it, but it's actually quite > >>>>>>> independent of KIP-1071 and also a nice newbie ticket, so I would > >>>>>>> propose we treat it as out-of-scope and let somebody define it in a > >>>>>>> separate KIP - any discussion about it would anyway be lost in this > >>>>>>> long discussion thread. > >>>>>>> > >>>>>>> S12. Member epoch behaves the same as in KIP-848. See the KIP-848 > >>>>>>> explanation for basics: > >>>>>>> > >>>>>>> > >>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup > >>>>>>> > >>>>>>> Together with the section "Reconciliation of the group", this > should > >>>>>>> make clear how member epochs, group epochs and assignment epochs > work > >>>>>>> in KIP-1071, but I'll try to summarize the reconciliation a bit > more > >>>>>>> explicitly below: > >>>>>>> > >>>>>>> Essentially, the broker drives a reconciliation for each member > >>>>>>> separately. The goal is to reconcile the assignment generated by > the > >>>>>>> assignor and reach the assignment epoch, but it's an asynchronous > >>>>>>> process, so each member may still be on an older epoch, which is > the > >>>>>>> member epoch. We do not reinvent any of the internal machinery for > >>>>>>> streams here, we are just adding standby tasks and warm-up tasks to > >>>>>>> the reconciliation of KIP-848. > >>>>>>> > >>>>>>> During reconciliation, the broker first asks the member to revoke > all > >>>>>>> tasks that are not in its target assignment, by removing it from > the > >>>>>>> current assignment for the member (which is provided in the > heartbeat > >>>>>>> response). Until revocation is completed, the member remains on the > >>>>>>> old member epoch and in `UNREVOKED` state. Only once the member > >>>>>>> confirms revocation of these tasks by removing them from the set of > >>>>>>> owned tasks in the heartbeat request, the member can transition to > >>>> the > >>>>>>> current assignment epoch and get new tasks assigned. Once the > member > >>>>>>> has revoked all tasks that are not in its target assignment, it > >>>>>>> transitions to the new epoch and gets new tasks assigned. There are > >>>>>>> some tasks that may be in the (declarative) target assignment of > the > >>>>>>> member generated by the assignor, but that may not be immediately > >>>>>>> added to the current assignment of the member, because they are > still > >>>>>>> "unreleased". Unreleased tasks are: > >>>>>>> > >>>>>>> - A. For any task, another instance of the same process may own the > >>>>>>> task in any role (active/standby/warm-up), and we are still > awaiting > >>>>>>> revocation. > >>>>>>> > >>>>>>> - B. For an active task, an instance of any process may own the > same > >>>>>>> active task, and we are still awaiting revocation. > >>>>>>> > >>>>>>> As long as there is at least one unreleased task, the member > remains > >>>>>>> in state UNRELEASED. Once all tasks of the target assignment are > >>>> added > >>>>>>> to the current assignment of the member (because they were > released), > >>>>>>> the member transitions to STABLE. > >>>>>>> > >>>>>>> Technically, the first constraint (we cannot assign a task in two > >>>>>>> different roles to the same process) is only relevant for stateful > >>>>>>> tasks that use the local state directory. I wonder if we should > relax > >>>>>>> this restriction slightly, by marking sub-topologies that access > the > >>>>>>> local state directory. This information could also be used by the > >>>>>>> assignor. But we can also introduce as a follow-up improvement. > >>>>>>> > >>>>>>> S13i. You are right, these bumps of the group epoch are "on top" of > >>>>>>> the KIP-848 reasons for bumping the group epoch. I added the > KIP-848 > >>>>>>> reasons as well, because this was a bit confusing in the KIP. > >>>>>>> > >>>>>>> S13ii. The group epoch is bumped immediately once a warm-up task > has > >>>>>>> caught up, since this is what triggers running the assignor. > However, > >>>>>>> the assignment epoch is only bumped once the task assignor has run. > >>>> If > >>>>>>> the task assignment for a member hasn't changed, it will transition > >>>> to > >>>>>>> the new assignment epoch immediately. > >>>>>>> > >>>>>>> S13iii. In the case of a rolling bounce, the group epoch will be > >>>>>>> bumped for every member that leaves and joins the group. For > updating > >>>>>>> rack ID, client tags, topology ID etc. -- this will mostly play a > >>>> role > >>>>>>> for static membership, as you mentioned, but would cause the same > >>>>>>> thing - in a rolling bounce with static membership, we will attempt > >>>> to > >>>>>>> recompute the assignment after every bounce, if the topology ID is > >>>>>>> changed in the members. This can cause broker load, as you > mentioned. > >>>>>>> Note, however, that we don't have to compute an assignment for > every > >>>>>>> group epoch bump. While KIP-848 computes its assignment as part of > >>>> the > >>>>>>> main loop of the group coordinator, it uses asynchronous assignment > >>>> in > >>>>>>> the case of client-side assignment, where the assignment is slower. > >>>> So > >>>>>>> it will kick off the assignment on the client side, enter state > >>>>>>> `ASSIGNING`, but may have more group epoch bumps in the meantime > >>>> while > >>>>>>> the assignment is computed on the client side. We leave it open to > >>>> use > >>>>>>> the same mechanism on the broker to compute assignment > asynchronously > >>>>>>> on a separate thread, in case streams assignment is too slow for > the > >>>>>>> main loop of the group coordinator. We will also consider > >>>>>>> rate-limiting the assignor to reduce broker load. However, it's too > >>>>>>> early to define this in detail, as we don't know much about the > >>>>>>> performance impact yet. > >>>>>>> > >>>>>>> S13iv. The heartbeat just sends "null" instead of the actual value > in > >>>>>>> case the value didn't change since the last heartbeat, so no > >>>>>>> comparison is required. This is defined in the heartbeat RPC and > the > >>>>>>> same as in KIP-848. > >>>>>>> > >>>>>>> S13v. I think I disagree. The group epoch represents a version of > the > >>>>>>> group state, including the current topology and the metadata of all > >>>>>>> members -- the whole input to the task assignor -- and defines > >>>> exactly > >>>>>>> the condition under which we attempt to recompute the assignment. > So > >>>>>>> it's not overloaded, and has a clear definition. It's also exactly > >>>> the > >>>>>>> same as in KIP-848, and we do not want to move away too far from > that > >>>>>>> protocol, unless it's something that is specific to streams that > need > >>>>>>> to be changed. Also, the protocol already uses three types of > epochs > >>>> - > >>>>>>> member epoch, assignment epoch, group epoch. I don't think adding > >>>> more > >>>>>>> epochs will make things clearer. Unless we have a strong reason why > >>>> we > >>>>>>> need to add extra epochs, I'm not in favor. > >>>>>>> > >>>>>>> S14. Assignment epoch is the same as in KIP-848: > >>>>>>> > >>>>>>> > >>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment > >>>>>>> > >>>>>>> Its definition does not change at all, so we didn't describe it in > >>>>>>> detail in KIP-1071. > >>>>>>> > >>>>>>> S15. I agree that the name is a bit generic, but the time to change > >>>>>>> this was when `group.instance.id` was introduced. Also, if we > start > >>>>>>> diverging from regular consumers / consumer groups in terms of > >>>> naming, > >>>>>>> it will not make things clearer. "application.id" and "group.id" > are > >>>>>>> bad enough, in my eyes. I hope it's okay if I just improve the > field > >>>>>>> documentation. > >>>>>>> > >>>>>>> S16. This goes back to KIP-482. Essentially, in flexible request > >>>>>>> schema versions you can define optional fields that will be encoded > >>>>>>> with a field tag on the wire, but can also be completely omitted. > >>>>>>> This is similar to how fields are encoded in, e.g., protobuf. It > >>>>>>> improves schema evolution, because we can add new fields to the > >>>> schema > >>>>>>> without bumping the version. There is also a minor difference in > >>>>>>> encoding size: when encoded, tagged fields take up more bytes on > the > >>>>>>> wire because of the field tag, but they can be omitted completely. > It > >>>>>>> will become interesting when we want to include extra data after > the > >>>>>>> initial protocol definition. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Lucas > >>>>>>> > >>>>>>> On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman > >>>>>>> <sop...@responsive.dev> wrote: > >>>>>>>> > >>>>>>>> Ah, my bad -- I thought I refreshed the page to get the latest > >>>> version > >>>>>>>> which is why I was a bit confused when I couldn't find anything > >>>> about > >>>>>> the > >>>>>>>> new tools which I had previously seen in the KIP. Sorry for the > >>>>>> confusion > >>>>>>>> and unnecessary questions > >>>>>>>> > >>>>>>>> S1. > >>>>>>>> > >>>>>>>>> You could imagine calling the initialize RPC > >>>>>>>>> explicitly (to implement explicit initialization), but this would > >>>>>>>>> still mean sending an event to the background thread, and the > >>>>>>>>> background thread in turn invokes the RPC. However, explicit > >>>>>>>>> initialization would require some additional public interfaces > >>>> that we > >>>>>>>>> are not including in this KIP. > >>>>>>>> > >>>>>>>> I'm confused -- if we do all the group management stuff in a > >>>> background > >>>>>>>> thread to avoid needing public APIs, how do we pass all the > >>>>>>>> Streams-specific and app-specific info to the broker for the > >>>>>>>> StreamsGroupInitialize? I am guessing this is where we need to > >>>> invoke > >>>>>>>> internal APIs and is related to our discussion about removing the > >>>>>>>> KafkaClientSupplier. However, it seems to me that no matter how > you > >>>>>> look at > >>>>>>>> it, Kafka Streams will need to pass information to and from the > >>>>>> background > >>>>>>>> thread if the background thread is to be aware of things like > tasks, > >>>>>> offset > >>>>>>>> sums, repartition topics, etc > >>>>>>>> > >>>>>>>> We don't really need to rehash everything here since it seems like > >>>> the > >>>>>>>> proposal to add a StreamsConsumer/StreamsAssignment interface will > >>>>>> address > >>>>>>>> this, which I believe we're in agreement on. I just wanted to > point > >>>> out > >>>>>>>> that this work to replace the KafkaClientSupplier is clearly > >>>> intimately > >>>>>>>> tied up with KIP-1071, and should imho be part of this KIP and not > >>>> its > >>>>>> own > >>>>>>>> standalone thing. > >>>>>>>> > >>>>>>>> By the way I'm happy to hop on a call to help move things forward > on > >>>>>> that > >>>>>>>> front (or anything really). Although I guess we're just waiting > on a > >>>>>> design > >>>>>>>> right now? > >>>>>>>> > >>>>>>>> S2. I was suggesting something lower for the default upper limit > on > >>>> all > >>>>>>>> groups. I don't feel too strongly about it, just wanted to point > out > >>>>>> that > >>>>>>>> the tradeoff is not about the assignor runtime but rather about > >>>> resource > >>>>>>>> consumption, and that too many warmups could put undue load on the > >>>>>> cluster. > >>>>>>>> In the end if we want to trust application operators then I'd say > >>>> it's > >>>>>> fine > >>>>>>>> to use a higher cluster-wide max like 100. > >>>>>>>> > >>>>>>>> S8-10 I'll get back to you on this in another followup since I'm > >>>> still > >>>>>>>> thinking some things through and want to keep the discussion > >>>> rolling for > >>>>>>>> now. In the meantime I have some additional questions: > >>>>>>>> > >>>>>>>> S11. One of the main issues with unstable assignments today is the > >>>> fact > >>>>>>>> that assignors rely on deterministic assignment and use the > process > >>>> Id, > >>>>>>>> which is not configurable and only persisted via local disk in a > >>>>>>>> best-effort attempt. It would be a very small change to include > >>>> this in > >>>>>>>> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR > for > >>>>>> this > >>>>>>>> myself so as not to add to your load). There's a ticket for it > here: > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-15190 > >>>>>>>> > >>>>>>>> S12. What exactly is the member epoch and how is it defined? When > >>>> is it > >>>>>>>> bumped? I see in the HB request field that certain values signal a > >>>>>> member > >>>>>>>> joining/leaving, but I take it there are valid positive values > >>>> besides > >>>>>> the > >>>>>>>> 0/-1/-2 codes? More on this in the next question: > >>>>>>>> > >>>>>>>> S13. The KIP says the group epoch is updated in these three cases: > >>>>>>>> a. Every time the topology is updated through the > >>>>>> StreamsGroupInitialize API > >>>>>>>> b. When a member with an assigned warm-up task reports a task > >>>> changelog > >>>>>>>> offset and task changelog end offset whose difference is less that > >>>>>>>> acceptable.recovery.lag. > >>>>>>>> c. When a member updates its topology ID, rack ID, client tags or > >>>>>> process > >>>>>>>> ID. Note: Typically, these do not change within the lifetime of a > >>>>>> Streams > >>>>>>>> client, so this only happens when a member with static membership > >>>>>> rejoins > >>>>>>>> with an updated configuration. > >>>>>>>> > >>>>>>>> S13.i First, just to clarify, these are not the *only* times the > >>>> group > >>>>>>>> epoch is bumped but rather the additional cases on top of the > >>>> regular > >>>>>>>> consumer group protocol -- so it's still bumped when a new member > >>>> joins > >>>>>> or > >>>>>>>> leaves, etc, right? > >>>>>>>> S13.ii Second, does (b) imply the epoch is bumped just whenever > the > >>>>>> broker > >>>>>>>> notices a task has finished warming up, or does it first reassign > >>>> the > >>>>>> task > >>>>>>>> and then bump the group epoch as a result of this task > reassignment? > >>>>>>>> S13.iii Third, (a) mentions the group epoch is bumped on each > >>>>>>>> StreamsGroupInitialize API but (c) says it gets bumped whenever a > >>>> member > >>>>>>>> updates its topology ID. Does this mean that in the event of a > >>>> rolling > >>>>>>>> upgrade that changes the topology, the group epoch is bumped just > >>>> once > >>>>>> or > >>>>>>>> for each member that updates its topology ID? Or does the > "updating > >>>> its > >>>>>>>> topology ID" somehow have something to do with static membership? > >>>>>>>> S13.iv How does the broker know when a member has changed its rack > >>>> ID, > >>>>>>>> client tags, etc -- does it compare these values on every hb?? > That > >>>>>> seems > >>>>>>>> like a lot of broker load. If this is all strictly for static > >>>> membership > >>>>>>>> then have we considered leaving static membership support for a > >>>> followup > >>>>>>>> KIP? Not necessarily advocating for that, just wondering if this > was > >>>>>>>> thought about already. Imo it would be acceptable to not support > >>>> static > >>>>>>>> membership in the first version (especially if we fixed other > issues > >>>>>> like > >>>>>>>> making the process Id configurable to get actual stable > >>>> assignments!) > >>>>>>>> S13.v I'm finding the concept of group epoch here to be somewhat > >>>>>>>> overloaded. It sounds like it's trying to keep track of multiple > >>>> things > >>>>>>>> within a single version: the group membership, the topology > >>>> version, the > >>>>>>>> assignment version, and various member statuses (eg rack ID/client > >>>>>> tags). > >>>>>>>> Personally, I think it would be extremely valuable to unroll all > of > >>>> this > >>>>>>>> into separate epochs with clear definitions and boundaries and > >>>> triggers. > >>>>>>>> For example, I would suggest something like this: > >>>>>>>> > >>>>>>>> group epoch: describes the group at a point in time, bumped when > >>>> set of > >>>>>>>> member ids changes (ie static or dynamic member leaves or joins) > -- > >>>>>>>> triggered by group coordinator noticing a change in group > membership > >>>>>>>> topology epoch: describes the topology version, bumped for each > >>>>>> successful > >>>>>>>> StreamsGroupInitialize that changes the broker's topology ID -- > >>>>>> triggered > >>>>>>>> by group coordinator bumping the topology ID (ie > >>>> StreamsGroupInitialize) > >>>>>>>> (optional) member epoch: describes the member version, bumped > when a > >>>>>>>> memberId changes its rack ID, client tags, or process ID (maybe > >>>> topology > >>>>>>>> ID, not sure) -- is this needed only for static membership? Going > >>>> back > >>>>>> to > >>>>>>>> question S12, when is the member epoch is bumped in the current > >>>>>> proposal? > >>>>>>>> assignment epoch: describes the assignment version, bumped when > the > >>>>>>>> assignor runs successfully (even if the actual assignment of tasks > >>>> does > >>>>>> not > >>>>>>>> change) -- can be triggered by tasks completing warmup, a manual > >>>>>>>> client-side trigger (future KIP only), etc (by definition is > bumped > >>>> any > >>>>>>>> time any of the other epochs are bumped since they all trigger a > >>>>>>>> reassignment) > >>>>>>>> > >>>>>>>> S14. The KIP also mentions a new concept of an "assignment epoch" > >>>> but > >>>>>>>> doesn't seem to ever elaborate on how this is defined and what > it's > >>>> used > >>>>>>>> for. I assume it's bumped each time the assignment changes and > >>>>>> represents a > >>>>>>>> sort of "assignment version", is that right? Can you describe in > >>>> more > >>>>>>>> detail the difference between the group epoch and the assignment > >>>> epoch? > >>>>>>>> > >>>>>>>> S15. I assume the "InstanceId" field in the HB request corresponds > >>>> to > >>>>>> the > >>>>>>>> group.instance.id config of static membership. I always felt this > >>>> field > >>>>>>>> name was too generic and easy to get mixed up with other > >>>> identifiers, > >>>>>> WDYT > >>>>>>>> about "StaticId" or "StaticInstanceId" to make the static > membership > >>>>>>>> relation more explicit? > >>>>>>>> > >>>>>>>> S16. what is the "flexibleVersions" field in the RPC protocols? I > >>>> assume > >>>>>>>> this is related to versioning compatibility but it's not quite > clear > >>>>>> from > >>>>>>>> the KIP how this is to be used. For example if we modify the > >>>> protocol by > >>>>>>>> adding new fields at the end, we'd bump the version but should the > >>>>>>>> flexibleVersions field change as well? > >>>>>>>> > >>>>>>>> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy > >>>>>>>> <lbruts...@confluent.io.invalid> wrote: > >>>>>>>> > >>>>>>>>> Hi Sophie, > >>>>>>>>> > >>>>>>>>> Thanks for your detailed comments - much appreciated! I think you > >>>> read > >>>>>>>>> a version of the KIP that did not yet include the admin > >>>> command-line > >>>>>>>>> tool and the Admin API extensions, so some of the comments are > >>>> already > >>>>>>>>> addressed in the KIP. > >>>>>>>>> > >>>>>>>>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called > in > >>>> the > >>>>>>>>> consumer background thread. Note that in the new consumer > threading > >>>>>>>>> model, all RPCs are run by the background thread. Check out this: > >>>>>>>>> > >>>>>>>>> > >>>>>> > >>>> > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > >>>>>>>>> for more information. Both our RPCs are just part of the group > >>>>>>>>> membership management and do not need to be invoked explicitly by > >>>> the > >>>>>>>>> application thread. You could imagine calling the initialize RPC > >>>>>>>>> explicitly (to implement explicit initialization), but this would > >>>>>>>>> still mean sending an event to the background thread, and the > >>>>>>>>> background thread in turn invokes the RPC. However, explicit > >>>>>>>>> initialization would require some additional public interfaces > >>>> that we > >>>>>>>>> are not including in this KIP. StreamsGroupDescribe is called by > >>>> the > >>>>>>>>> AdminClient, and used by the command-line tool > >>>>>>>>> kafka-streams-groups.sh. > >>>>>>>>> > >>>>>>>>> S2. I think the max.warmup.replicas=100 suggested by Nick was > >>>> intended > >>>>>>>>> as the upper limit for setting the group configuration on the > >>>> broker. > >>>>>>>>> Just to make sure that this was not a misunderstanding. By > default, > >>>>>>>>> values above 100 should be rejected when setting a specific value > >>>> for > >>>>>>>>> group. Are you suggesting 20 or 30 for the default value for > >>>> groups, > >>>>>>>>> or the default upper limit for the group configuration? > >>>>>>>>> > >>>>>>>>> S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The > >>>>>>>>> MemberEpoch=-1 is a left-over from an earlier discussion. It > means > >>>>>>>>> that the member is leaving the group, so the intention was that > the > >>>>>>>>> member must leave the group when it asks the other members to > shut > >>>>>>>>> down. We later reconsidered this and decided that all > applications > >>>>>>>>> should just react to the shutdown application signal that is > >>>> returned > >>>>>>>>> by the broker, so the client first sets the ShutdownApplication > and > >>>>>>>>> later leaves the group. Thanks for spotting this, I removed it. > >>>>>>>>> > >>>>>>>>> S4. Not sure if this refers to the latest version of the KIP. We > >>>> added > >>>>>>>>> an extension of the admin API and a kafka-streams-groups.sh > >>>>>>>>> command-line tool to the KIP already. > >>>>>>>>> > >>>>>>>>> S5. All RPCs for dealing with offsets will keep working with > >>>> streams > >>>>>>>>> groups. The extension of the admin API is rather cosmetic, since > >>>> the > >>>>>>>>> method names use "consumer group". The RPCs, however, are generic > >>>> and > >>>>>>>>> do not need to be changed. > >>>>>>>>> > >>>>>>>>> S6. Yes, you can use the DeleteGroup RPC with any group ID, > whether > >>>>>>>>> streams group or not. > >>>>>>>>> > >>>>>>>>> S7. See the admin API section. > >>>>>>>>> > >>>>>>>>> S8. I guess for both A and B, I am not sure what you are > >>>> suggesting. > >>>>>>>>> Do you want to make the broker-side topology immutable and not > >>>> include > >>>>>>>>> any information about the topology, like the topology ID in the > >>>> RPC? > >>>>>>>>> It would seem that this would be a massive food-gun for people, > if > >>>>>>>>> they start changing their topology and don't notice that the > >>>> broker is > >>>>>>>>> looking at a completely different version of the topology. Or did > >>>> you > >>>>>>>>> mean that there is some kind of topology ID, so that at least we > >>>> can > >>>>>>>>> detect inconsistencies between broker and client-side topologies, > >>>> and > >>>>>>>>> we fence out any member with an incorrect topology ID? Then we > >>>> seem to > >>>>>>>>> end up with mostly the same RPCs and the same questions (how is > the > >>>>>>>>> topology ID generated?). I agree that the latter could be an > >>>> option. > >>>>>>>>> See summary at the end of this message. > >>>>>>>>> > >>>>>>>>> S9. If we flat out refuse topology updates, agree - we cannot let > >>>> the > >>>>>>>>> application crash on minor changes to the topology. However, if > we > >>>>>>>>> allow topology updates as described in the KIP, there are only > >>>> upsides > >>>>>>>>> to making the topology ID more sensitive. All it will cause is > >>>> that a > >>>>>>>>> client will have to resend a `StreamsGroupInitialize`, and during > >>>> the > >>>>>>>>> rolling bounce, older clients will not get tasks assigned. > >>>>>>>>> > >>>>>>>>> S10. The intention in the KIP is really just this - old clients > can > >>>>>>>>> only retain tasks, but not get new ones. If the topology has > >>>>>>>>> sub-topologies, these will initially be assigned to the new > >>>> clients, > >>>>>>>>> which also have the new topology. You are right that rolling an > >>>>>>>>> application where the old structure and the new structure are > >>>>>>>>> incompatible (e.g. different subtopologies access the same > >>>> partition) > >>>>>>>>> will cause problems. But this will also cause problems in the > >>>> current > >>>>>>>>> protocol, so I'm not sure, if it's strictly a regression, it's > just > >>>>>>>>> unsupported (which we can only make the best effort to detect). > >>>>>>>>> > >>>>>>>>> In summary, for the topology ID discussion, I mainly see two > >>>> options: > >>>>>>>>> 1) stick with the current KIP proposal > >>>>>>>>> 2) define the topology ID as the hash of the > StreamsGroupInitialize > >>>>>>>>> topology metadata only, as you suggested, and fence out members > >>>> with > >>>>>>>>> an incompatible topology ID. > >>>>>>>>> I think doing 2 is also a good option, but in the end it comes > >>>> down to > >>>>>>>>> how important we believe topology updates (where the set of > >>>>>>>>> sub-topologies or set of internal topics are affected) to be. The > >>>> main > >>>>>>>>> goal is to make the new protocol a drop-in replacement for the > old > >>>>>>>>> protocol, and at least define the RPCs in a way that we won't > need > >>>>>>>>> another KIP which bumps the RPC version to make the protocol > >>>>>>>>> production-ready. Adding more command-line tools, or public > >>>> interfaces > >>>>>>>>> seems less critical as it doesn't change the protocol and can be > >>>> done > >>>>>>>>> easily later on. The main question becomes - is the protocol > >>>>>>>>> production-ready and sufficient to replace the classic protocol > if > >>>> we > >>>>>>>>> go with 2? > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Lucas > >>>>>>>>> > >>>>>> > >>>> > >>>> > >>> > >> > >