Just following up to say thank you Lucas for the detailed explanations, and especially the thorough response to my more "existential" questions about building off 848 vs introducing a separate Streams group protocol. This really helped me understand the motivations behind this decision. No notes!
On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy <lbruts...@confluent.io.invalid> wrote: > Hi Sophie, > > thanks for getting deeply involved in this discussion. > > S16. Adding tagged fields would not require an RPC version bump at > all. This should already a cover a lot of use cases where requestion > versioning wouldn't become an issue, as long as it is safe to ignore > the new fields. Other than that, I'm not aware of a formal definition > of the what request version bumps mean in relation to Apache Kafka > versions - if anybody knows, please let me know. In my understanding, > the Kafka protocol evolves independently of AK versions, and for the > Kafka protocol to change, you require an accepted KIP, and not an AK > release. The grey area is that a protocol change without an AK release > does not mean much, because even if cloud vendors deploy it before an > AK release, virtually no clients would support it and the protocol > would fall back to the earlier RPC version. So maybe this isn't that > clearly formalized because it does not matter much in practice. > Normally, I would expect an RPC version bump after each KIP that > breaks RPC compatibility. But there seems to be some flexibility here > - for example, the KIP-848 RPCs were changed after the fact in trunk, > because they were not exposed yet by default in AK (or any other > implementation of the Kafka protocol), so a breaking change without > request version bump was probably not deemed problematic. > > S17/18. We can add an informational table for the interface to the > assignor, and that should also make clear how a client-side assignment > interface would look. Generally, our idea would be to send all the > input information for the task assignor to a client and expect all the > output information of a task assignor in return, very similar to > KIP-848. We did not include such a table because we do not define a > public interface to define custom broker-side assignors in this KIP. > But generally, the input / output of the (task) assignor mostly > follows the data fields sent to / from the brokers. Let me do that, > but first I'll respond to the rest of your questions that seem more > important. > > S19. I think an explicit trigger for rebalance could make sense, but > let me explain the thinking why we haven't included this in the > current KIP: We don't want to include it for client-side assignment > obviously, because this KIP does not include client-side assignment. > For triggering a rebalance when a warm-up has caught up, this is > treated differently in the KIP: the task assignor is called when a > client sends updated task offsets. The client does not update its task > offsets regularly, but according to an interval > `task.offset.interval.ms`, but also immediately when a client notices > that a warm-up has caught up. So effectively, it is mostly the client > that is constantly checking the lag. I absolutely believe that the > client should be checking for `acceptable.recovery.lag`, and we are > using the presence of the task offset field to trigger reassignment. > What we weren't sure is how often we need the task offsets to be > updated and how often we want to retrigger the assignment because of > that. An explicit field for "forcing" a reassignment may still make > sense, so that we can update the task offsets on the broker more often > than we call the assignor. However, I'm not convinced that this should > just be a generic "rebalance trigger", as this seems to be going > against the goals of KIP-848. Generally, I think one of the principles > of KIP-848 is that a member just describes its current state and > metadata, and the group coordinator makes a decision on whether > reassignment needs to be triggered. So it would seem more natural to > just indicate a boolean "I have caught-up warm-up tasks" to the group > coordinator. I understand that for client-side assignment, you may > want to trigger assignment explicitely without the group coordinator > understanding why, but then we should add such a field as part of the > client-side assignment KIP. > > S20. Yes, inside the group coordinator, the main differences are > topology initialization handling, and the semantics of task assignment > vs. partition assignment. It will not be a completely separate group > coordinator, and we will not change things like the offset reading and > writing. What will have its own code path is the handling of > heartbeats and the reconciliation of the group (so, which tasks are > sent to which member at which point in time until we reach the > declarative target assignment that was produced by a client-side or > server-side assignor). Also, the assignor implementations will > obviously be completely separate from the consumer group assignors, > but that is already the case currently. Streams groups will co-exist > in the group coordinator with share groups (for queues), classic > groups (old generic protocol for consumers and connect) and consumer > groups (from KIP-848), and potentially a new group type for connect as > well. Each of these group types have a separate code path for heart > beating, reconciling assignments, keeping track of the internal state > of the group and responding to specific requests from admin API, like > ConsumerGroupDescribe. For streams, there will be a certain amount of > duplication from consumer groups - for example, liveness tracking for > members (setting timers for the session timeout etc.) is essentially > the same between both groups, and the reconciliation logic is quite > similar. We will make an effort to refactor as much of this code in a > way that it can be reused by both group types, and a lot of things > have already been made generic with the introduction of share groups > in the group coordinator. > > S21. It's nice that we are discussing "why are we doing this" in > question 21 :D. I think we are touching upon the reasoning in the KIP, > but let me explain the reasoning in more detail. You are right, the > reason we want to implement custom RPCs for Kafka Streams is not about > broker-side vs. client-side assignment. The broker could implement the > Member Metadata versioning scheme of Kafka Streams, intercept consumer > group heartbeats that look like Kafka Streams heartbeats, deserialize > the custom metadata and run a broker-side assignor for Kafka Streams, > and finally serialize custom Kafka Streams assignment metadata, all > based on KIP-848 RPCs. > > First of all, the main difference we see with custom RPCs is really > that streams groups become a first-level concept for both the broker > and clients of the Kafka. This also goes into the answer for S20 (what > is different beside topology initialization and task reconciliation). > With streams groups, I can use the Admin API call to see the current > assignment, the target assignment, all the metadata necessary for > assignment, and some information about the current topology in a > human-readable and machine-readable form, instead of having to parse > it from logs on some client machine that may or may not have the right > log-level configured. This will help humans to understand what is > going on, but can also power automated tools / CLIs / UIs, and I think > this is a major improvement for operating a streams application. > Again, this is not about who is in control of the broker or the > clients, the information becomes accessible for everybody who can call > the Admin API. It also does not depend on the cloud vendor, it will be > an improvement available for pure AK users, and will also work with > client-side assignment (although obviously, if there is an intention > to start encoding proprietary data in black-box byte-arrays, this > information remains hidden to users of the open source distribution > again). > > I also think that the original proposal of reusing the new consumer > group protocol for Kafka Streams has some downsides, that, while not > being deal-breakers, would make the new behaviour of Kafka Streams > less than ideal. One example is the reconciliation: if we have a > custom stream partition assignor (client- or broker-side doesn't > matter), we can only control the target assignment that the group > converges to, but none of the intermediate states that the group > transitions through while attempting to reach the desired end state. > For example, there is no way to prevent the reconciliation loop of > KIP-848 from giving an active task to processX before another instance > on processX has revoked the corresponding standby task. There is no > synchronization point where these movements are done at the same time > as in the classic protocol, and on the broker we can't prevent this, > because the reconciliation loop in KIP-848 does not know what a > standby task is, or what a process ID is. You could argue, you can let > the application run into LockExceptions in the meantime, but it will > be one of the things that will make Kafka Streams work less than > ideal. Another example is task offsets. It is beneficial for streams > partition assignors to have relatively recent task offset information, > which, in KIP-848, needs to be sent to the assignor using a black-box > MemberMetadata byte array. This member metadata byte array is > persisted to the consumer offset topic every time it changes (for > failover). So we end up with a bad trade-off: if we send the task > offset information to the broker very frequently, we will have to > write those member metadata records very frequently as well. If we do > not send them frequently, streams task assignors will have very > outdated lag information. In KIP-1071 we can make the decision to not > persist the task offset information, which avoids this trade-off. It > gives us up-to-date lag information in the usual case, but after > fail-over of the group coordinator, our lag information may be missing > for a short period of time. These are two examples, but I think the > general theme is, that with custom RPCs, Kafka Streams is in control > of how the group metadata is processed and how the tasks are > reconciled. This is not just a consideration for now, but also the > future: will KIP-848 reconciliation and metadata management always > work "good enough" for Kafka Streams, or will there be use cases where > we want to evolve and customize the behaviour of Kafka Streams groups > independently of consumer groups. > > A final point that I would like to make, I don't think that custom > streams RPCs are inherently a lot harder to do for the Kafka Streams > community than "just utilizing" KIP-848, at least not as much as you > would think. Many things in KIP-848, such customizable assignment > metadata with a metadata versioning scheme are made to look like > general-purpose mechanisms, but are effectively mechanisms for Kafka > Streams and also completely unimplemented at this point in any > implementation of the Kafka protocol. We know that there is very > little use of custom partition assignors in Kafka besides Kafka > Streams, and we can expect the same to be true of these new > "customizable" features of KIP-848. So, besides the Kafka Streams > community, there is no one driving the implementation of these > features. "Just utilizing" KIP-848 for Kafka Streams would > effectively mean to implement all this, and still make a lot of deep > changes to Kafka Streams (as mechanisms like probing rebalances etc. > don't make sense in KIP-848). I think adapting KIP-848 to work with > Kafka Streams either way is a big effort. Piggybacking on consumer > group RPCs and extending the consumer group protocol to the point that > it can carry Kafka Streams on its back would not make things > significantly easier. Some things, like online migration from the old > protocol to the new protocol or reimplementing metadata versioning on > top of KIP-848, may even be harder in KIP-848, than in KIP-1071. I > don't think we can consider consumer group RPCs as a shortcut to > revamp the streams rebalance protocol, but should be focussing on what > makes Kafka Streams work best down the road. > > Hope that makes sense! > > Cheers, > Lucas > > On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman > <sop...@responsive.dev> wrote: > > > > Thanks for another detailed response Lucas! Especially w.r.t how the > epochs > > are defined. I also went back and re-read KIP-848 to refresh myself, I > > guess it wasn't clear to me how much of the next-gen consumer protocol we > > are reusing vs what's being built from the ground up. > > > > S16. > > > > > It will become interesting when we want to include extra data after > the > > > initial protocol definition. > > > > I guess that's what I'm asking us to define -- how do we evolve the > schema > > when expanding the protocol? I think we need to set a few public > contracts > > here, such as whether/which versions get bumped when. For example if I > > add/modify fields twice within a single Kafka release, do we still need > to > > update the version? I think the answer is "yes" since I know you > Confluent > > folks upgrade brokers more often than AK releases, but this is different > > from how eg the SubscriptionInfo/AssignmentInfo of the > > StreamsPartitionAssignor works today, hence I want to put all that in > > writing (for those of us who aren't familiar with how all this works > > already and may be modifying it in the future, say, to add client-side > > assignment :P) > > > > Maybe a quick example of what to do to evolve this schema is sufficient? > > > > S17. > > The KIP seems to throw the assignor, group coordinator, topic > > initialization, and topology versioning all into a single black box on > the > > broker. I understand much of this is intentionally being glossed over as > an > > implementation detail since it's a KIP, but it would really help to tease > > apart these distinct players and define their interactions. For example, > > what are the inputs and outputs to the assignor? For example could we > get a > > table like the "Data Model > > < > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel > >" > > in 848? > > > > KIP-848 goes into detail on these things, but again, it's unclear what > > parts of 848 are and aren't supposed to be carried through. For example > 848 > > has a concept of assignor "reasons" and client-side assignors and > assignor > > metadata which can all trigger a rebalance/group epoch bump. However in > the > > current plan for 1071 none of these exist. Which leads me to my next > > point... > > > > S18. > > Many people use a managed Kafka service like Confluent Cloud where > brokers > > are on the bleeding edge but clients are upgraded less frequently. > However, > > the opposite is also true for a great many users: client applications are > > easy to upgrade and bumped often to get new features, whereas the brokers > > (often run by a different team altogether) are considered riskier to > > upgrade and stay on one version for a long time. We can argue about which > > case is more common -- in my personal experience it's actually the > latter, > > although I only realized this after my time at Confluent which was > > obviously skewed towards meeting Confluent Cloud users -- but regardless, > > we obviously need to support both cases as first-class citizens. And I'm > a > > bit concerned that there's not enough in the KIP to ensure that > client-side > > assignment won't end up getting hacked into the protocol as an > afterthought. > > > > Don't get me wrong, I'm still not demanding you guys implement > client-side > > assignors as part of this KIP. I'm happy to leave the implementation of > > that to a followup, but I feel we need to flesh out the basic direction > for > > this right now, as part of the v0 Streams protocol. > > > > I'm also not asking you to do this alone, and am happy to work with/for > you > > on this. I have one possible proposal sketched out already, although > before > > I share that I just wanted to make sure we're aligned on the fundamentals > > and have mapped out the protocol in full first (ie S17). Just want to > have > > a rough plan for how clients-side assignment will fit into the picture, > and > > right now it's difficult to define because it's unclear where the > > boundaries between the group coordinator and broker-side assignor are (eg > > what are assignor inputs and outputs). > > > > I'm committed to ensuring this doesn't add extra work or delay to this > KIP > > for you guys. > > > > S19. > > I think it could make sense to include a concept like the "assignor > > reason" from 848 as a top-level field and rebalance trigger (ie group > > epoch bump). This would mainly be for the client-side assignor to > trigger a > > rebalance, but for another example, it seems like we want the brokers to > be > > constantly monitoring warmup tasks and processing lag updates to know > when > > a warmup task is to be switched, which is heavy work. Why not just let > the > > client who owns the warmup and knows the lag decide when it has warmed up > > and signal the broker? It's a small thing for a client to check when its > > task gets within the acceptable recovery lag and notify via the heartbeat > > if so, but it seems like a rather big deal for the broker to be > constantly > > computing these checks for every task on every hb. Just food for thought, > > I don't really have a strong preference over where the warmup decision > > happens -- simply pointing it out that it fits very neatly into the > > "assignor reason"/"rebalance requested" framework. Making this a > > client-side decision also makes it easier to evolve and customize since > > Kafka Streams users tend to have more control over upgrading/implementing > > stuff in their client application vs the brokers (which is often a > > different team altogether) > > > > S20. > > To what extent do we plan to reuse implementations of 848 in order to > build > > the new Streams group protocol? I'm still having some trouble > understanding > > where we are just adding stuff on top and where we plan to rebuild from > the > > ground up. For example we seem to share most of the epoch reconciliation > > stuff but will have different triggers for the group epoch bump. Is it > > possible to just "plug in" the group epoch and let the consumer's group > > coordinator figure stuff out from there? Will we have a completely > distinct > > Streams coordinator? I know this is an implementation detail but imo it's > > ok to get in the weeds a bit for a large and complex KIP such as this. > > > > Setting aside the topology initialization stuff, is the main difference > > really just in how the group coordinator will be aware of tasks for > Streams > > groups vs partitions for consumer groups? > > > > Which brings me to a high level question I feel I have to ask: > > > > S21. > > Can you go into more detail on why we have to provide Kafka Streams its > own > > RPCs as opposed to just utilizing the protocol from KIP-848? The Rejected > > Alternatives does address this but the main argument there seems to be in > > implementing a broker-side assignor. But can't you do this with 848? And > > given there are users who rarely upgrade their brokers just as there are > > users who rarely upgrade their clients, it seems silly to do all this > work > > and change the fundamental direction of Kafka Streams just to give more > > control over the assignment to the brokers. > > > > For a specific example, the KIP says "Client-side assignment makes it > hard > > to debug and tune the assignment logic" -- personally, I would find it > > infinitely more difficult to tune and debug assignment logic if it were > > happening on the broker. But that's just because I work with the app > devs, > > not the cluster operators. The point is we shouldn't design an entire > > protocol to give preference to one vendor or another -- nobody wants the > > ASF to start yelling at us lol D: > > > > Another example, which imo is a good one (IIUC) is that "a simple > parameter > > change requires redeploying of all streams clients". That's definitely a > > huge bummer, but again, why would broker-side assignors not be able to > > trigger a reassignment based on a custom config in the 848 protocol? We > > could still have the ha and sticky assignors implemented on the brokers, > > and could still define all the Streams configs as broker configs to be > > passed into the streams custom broker assignors. That feels like a lot > less > > work than redoing our own group protocol from scratch? > > > > I don't want you guys to think I'm trying to block the entire direction > of > > this KIP. I'd just like to better understand your thoughts on this since > I > > assume you've discussed this at length internally. Just help me > understand > > why this huge proposal is justified (for my own sake and to convince any > > "others" who might be skeptical) > > > > > > On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org> wrote: > > > > > I still need to catch up on the discussion in general. But as promised, > > > I just started KIP-1088 about the `KafkaClientSupplier` question. > > > > > > Looking forward to your feedback on the new KIP. I hope we can get > > > KIP-1088 done soon, to not block this KIP. > > > > > > > > > -Matthias > > > > > > On 9/4/24 09:52, Lucas Brutschy wrote: > > > > Hi Sophie, > > > > > > > > thanks for the questions and comments! > > > > > > > > S1. KIP-1071 does not need to add public interfaces to streams, but > it > > > > does not (yet) make the streams protocol client pluggable. Matthias > > > > promised to write a KIP that will propose a solution for the > > > > KafkaClientSupplier soon. He suggested keeping it separate from this > > > > KIP, but we can treat it as a blocker. You are right that we could > try > > > > fitting an explicit topic initialization in this interface, if we > > > > decide we absolutely need it now, although I'm not sure if that would > > > > actually define a useful workflow for the users to explicitly update > a > > > > topology. Explicit initialization cannot be part of admin tools, > > > > because we do not have the topology there. It could probably be a > > > > config that defaults to off, or a method on the KafkaStreams object? > > > > To me, explicit initialization opens a broad discussion on how to do > > > > it, so I would like to avoid including it in this KIP. I guess it > also > > > > depends on the outcome of the discussion in S8-S10. > > > > > > > > S2. It's hard to define the perfect default, and we should probably > > > > aim for safe operations first - since people can always up the limit. > > > > I changed it to 20. > > > > > > > > S11. I was going to write we can add it, but it's actually quite > > > > independent of KIP-1071 and also a nice newbie ticket, so I would > > > > propose we treat it as out-of-scope and let somebody define it in a > > > > separate KIP - any discussion about it would anyway be lost in this > > > > long discussion thread. > > > > > > > > S12. Member epoch behaves the same as in KIP-848. See the KIP-848 > > > > explanation for basics: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup > > > > > > > > Together with the section "Reconciliation of the group", this should > > > > make clear how member epochs, group epochs and assignment epochs work > > > > in KIP-1071, but I'll try to summarize the reconciliation a bit more > > > > explicitly below: > > > > > > > > Essentially, the broker drives a reconciliation for each member > > > > separately. The goal is to reconcile the assignment generated by the > > > > assignor and reach the assignment epoch, but it's an asynchronous > > > > process, so each member may still be on an older epoch, which is the > > > > member epoch. We do not reinvent any of the internal machinery for > > > > streams here, we are just adding standby tasks and warm-up tasks to > > > > the reconciliation of KIP-848. > > > > > > > > During reconciliation, the broker first asks the member to revoke all > > > > tasks that are not in its target assignment, by removing it from the > > > > current assignment for the member (which is provided in the heartbeat > > > > response). Until revocation is completed, the member remains on the > > > > old member epoch and in `UNREVOKED` state. Only once the member > > > > confirms revocation of these tasks by removing them from the set of > > > > owned tasks in the heartbeat request, the member can transition to > the > > > > current assignment epoch and get new tasks assigned. Once the member > > > > has revoked all tasks that are not in its target assignment, it > > > > transitions to the new epoch and gets new tasks assigned. There are > > > > some tasks that may be in the (declarative) target assignment of the > > > > member generated by the assignor, but that may not be immediately > > > > added to the current assignment of the member, because they are still > > > > "unreleased". Unreleased tasks are: > > > > > > > > - A. For any task, another instance of the same process may own the > > > > task in any role (active/standby/warm-up), and we are still awaiting > > > > revocation. > > > > > > > > - B. For an active task, an instance of any process may own the same > > > > active task, and we are still awaiting revocation. > > > > > > > > As long as there is at least one unreleased task, the member remains > > > > in state UNRELEASED. Once all tasks of the target assignment are > added > > > > to the current assignment of the member (because they were released), > > > > the member transitions to STABLE. > > > > > > > > Technically, the first constraint (we cannot assign a task in two > > > > different roles to the same process) is only relevant for stateful > > > > tasks that use the local state directory. I wonder if we should relax > > > > this restriction slightly, by marking sub-topologies that access the > > > > local state directory. This information could also be used by the > > > > assignor. But we can also introduce as a follow-up improvement. > > > > > > > > S13i. You are right, these bumps of the group epoch are "on top" of > > > > the KIP-848 reasons for bumping the group epoch. I added the KIP-848 > > > > reasons as well, because this was a bit confusing in the KIP. > > > > > > > > S13ii. The group epoch is bumped immediately once a warm-up task has > > > > caught up, since this is what triggers running the assignor. However, > > > > the assignment epoch is only bumped once the task assignor has run. > If > > > > the task assignment for a member hasn't changed, it will transition > to > > > > the new assignment epoch immediately. > > > > > > > > S13iii. In the case of a rolling bounce, the group epoch will be > > > > bumped for every member that leaves and joins the group. For updating > > > > rack ID, client tags, topology ID etc. -- this will mostly play a > role > > > > for static membership, as you mentioned, but would cause the same > > > > thing - in a rolling bounce with static membership, we will attempt > to > > > > recompute the assignment after every bounce, if the topology ID is > > > > changed in the members. This can cause broker load, as you mentioned. > > > > Note, however, that we don't have to compute an assignment for every > > > > group epoch bump. While KIP-848 computes its assignment as part of > the > > > > main loop of the group coordinator, it uses asynchronous assignment > in > > > > the case of client-side assignment, where the assignment is slower. > So > > > > it will kick off the assignment on the client side, enter state > > > > `ASSIGNING`, but may have more group epoch bumps in the meantime > while > > > > the assignment is computed on the client side. We leave it open to > use > > > > the same mechanism on the broker to compute assignment asynchronously > > > > on a separate thread, in case streams assignment is too slow for the > > > > main loop of the group coordinator. We will also consider > > > > rate-limiting the assignor to reduce broker load. However, it's too > > > > early to define this in detail, as we don't know much about the > > > > performance impact yet. > > > > > > > > S13iv. The heartbeat just sends "null" instead of the actual value in > > > > case the value didn't change since the last heartbeat, so no > > > > comparison is required. This is defined in the heartbeat RPC and the > > > > same as in KIP-848. > > > > > > > > S13v. I think I disagree. The group epoch represents a version of the > > > > group state, including the current topology and the metadata of all > > > > members -- the whole input to the task assignor -- and defines > exactly > > > > the condition under which we attempt to recompute the assignment. So > > > > it's not overloaded, and has a clear definition. It's also exactly > the > > > > same as in KIP-848, and we do not want to move away too far from that > > > > protocol, unless it's something that is specific to streams that need > > > > to be changed. Also, the protocol already uses three types of epochs > - > > > > member epoch, assignment epoch, group epoch. I don't think adding > more > > > > epochs will make things clearer. Unless we have a strong reason why > we > > > > need to add extra epochs, I'm not in favor. > > > > > > > > S14. Assignment epoch is the same as in KIP-848: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment > > > > > > > > Its definition does not change at all, so we didn't describe it in > > > > detail in KIP-1071. > > > > > > > > S15. I agree that the name is a bit generic, but the time to change > > > > this was when `group.instance.id` was introduced. Also, if we start > > > > diverging from regular consumers / consumer groups in terms of > naming, > > > > it will not make things clearer. "application.id" and "group.id" are > > > > bad enough, in my eyes. I hope it's okay if I just improve the field > > > > documentation. > > > > > > > > S16. This goes back to KIP-482. Essentially, in flexible request > > > > schema versions you can define optional fields that will be encoded > > > > with a field tag on the wire, but can also be completely omitted. > > > > This is similar to how fields are encoded in, e.g., protobuf. It > > > > improves schema evolution, because we can add new fields to the > schema > > > > without bumping the version. There is also a minor difference in > > > > encoding size: when encoded, tagged fields take up more bytes on the > > > > wire because of the field tag, but they can be omitted completely. It > > > > will become interesting when we want to include extra data after the > > > > initial protocol definition. > > > > > > > > Cheers, > > > > Lucas > > > > > > > > On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman > > > > <sop...@responsive.dev> wrote: > > > >> > > > >> Ah, my bad -- I thought I refreshed the page to get the latest > version > > > >> which is why I was a bit confused when I couldn't find anything > about > > > the > > > >> new tools which I had previously seen in the KIP. Sorry for the > > > confusion > > > >> and unnecessary questions > > > >> > > > >> S1. > > > >> > > > >>> You could imagine calling the initialize RPC > > > >>> explicitly (to implement explicit initialization), but this would > > > >>> still mean sending an event to the background thread, and the > > > >>> background thread in turn invokes the RPC. However, explicit > > > >>> initialization would require some additional public interfaces > that we > > > >>> are not including in this KIP. > > > >> > > > >> I'm confused -- if we do all the group management stuff in a > background > > > >> thread to avoid needing public APIs, how do we pass all the > > > >> Streams-specific and app-specific info to the broker for the > > > >> StreamsGroupInitialize? I am guessing this is where we need to > invoke > > > >> internal APIs and is related to our discussion about removing the > > > >> KafkaClientSupplier. However, it seems to me that no matter how you > > > look at > > > >> it, Kafka Streams will need to pass information to and from the > > > background > > > >> thread if the background thread is to be aware of things like tasks, > > > offset > > > >> sums, repartition topics, etc > > > >> > > > >> We don't really need to rehash everything here since it seems like > the > > > >> proposal to add a StreamsConsumer/StreamsAssignment interface will > > > address > > > >> this, which I believe we're in agreement on. I just wanted to point > out > > > >> that this work to replace the KafkaClientSupplier is clearly > intimately > > > >> tied up with KIP-1071, and should imho be part of this KIP and not > its > > > own > > > >> standalone thing. > > > >> > > > >> By the way I'm happy to hop on a call to help move things forward on > > > that > > > >> front (or anything really). Although I guess we're just waiting on a > > > design > > > >> right now? > > > >> > > > >> S2. I was suggesting something lower for the default upper limit on > all > > > >> groups. I don't feel too strongly about it, just wanted to point out > > > that > > > >> the tradeoff is not about the assignor runtime but rather about > resource > > > >> consumption, and that too many warmups could put undue load on the > > > cluster. > > > >> In the end if we want to trust application operators then I'd say > it's > > > fine > > > >> to use a higher cluster-wide max like 100. > > > >> > > > >> S8-10 I'll get back to you on this in another followup since I'm > still > > > >> thinking some things through and want to keep the discussion > rolling for > > > >> now. In the meantime I have some additional questions: > > > >> > > > >> S11. One of the main issues with unstable assignments today is the > fact > > > >> that assignors rely on deterministic assignment and use the process > Id, > > > >> which is not configurable and only persisted via local disk in a > > > >> best-effort attempt. It would be a very small change to include > this in > > > >> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR for > > > this > > > >> myself so as not to add to your load). There's a ticket for it here: > > > >> https://issues.apache.org/jira/browse/KAFKA-15190 > > > >> > > > >> S12. What exactly is the member epoch and how is it defined? When > is it > > > >> bumped? I see in the HB request field that certain values signal a > > > member > > > >> joining/leaving, but I take it there are valid positive values > besides > > > the > > > >> 0/-1/-2 codes? More on this in the next question: > > > >> > > > >> S13. The KIP says the group epoch is updated in these three cases: > > > >> a. Every time the topology is updated through the > > > StreamsGroupInitialize API > > > >> b. When a member with an assigned warm-up task reports a task > changelog > > > >> offset and task changelog end offset whose difference is less that > > > >> acceptable.recovery.lag. > > > >> c. When a member updates its topology ID, rack ID, client tags or > > > process > > > >> ID. Note: Typically, these do not change within the lifetime of a > > > Streams > > > >> client, so this only happens when a member with static membership > > > rejoins > > > >> with an updated configuration. > > > >> > > > >> S13.i First, just to clarify, these are not the *only* times the > group > > > >> epoch is bumped but rather the additional cases on top of the > regular > > > >> consumer group protocol -- so it's still bumped when a new member > joins > > > or > > > >> leaves, etc, right? > > > >> S13.ii Second, does (b) imply the epoch is bumped just whenever the > > > broker > > > >> notices a task has finished warming up, or does it first reassign > the > > > task > > > >> and then bump the group epoch as a result of this task reassignment? > > > >> S13.iii Third, (a) mentions the group epoch is bumped on each > > > >> StreamsGroupInitialize API but (c) says it gets bumped whenever a > member > > > >> updates its topology ID. Does this mean that in the event of a > rolling > > > >> upgrade that changes the topology, the group epoch is bumped just > once > > > or > > > >> for each member that updates its topology ID? Or does the "updating > its > > > >> topology ID" somehow have something to do with static membership? > > > >> S13.iv How does the broker know when a member has changed its rack > ID, > > > >> client tags, etc -- does it compare these values on every hb?? That > > > seems > > > >> like a lot of broker load. If this is all strictly for static > membership > > > >> then have we considered leaving static membership support for a > followup > > > >> KIP? Not necessarily advocating for that, just wondering if this was > > > >> thought about already. Imo it would be acceptable to not support > static > > > >> membership in the first version (especially if we fixed other issues > > > like > > > >> making the process Id configurable to get actual stable > assignments!) > > > >> S13.v I'm finding the concept of group epoch here to be somewhat > > > >> overloaded. It sounds like it's trying to keep track of multiple > things > > > >> within a single version: the group membership, the topology > version, the > > > >> assignment version, and various member statuses (eg rack ID/client > > > tags). > > > >> Personally, I think it would be extremely valuable to unroll all of > this > > > >> into separate epochs with clear definitions and boundaries and > triggers. > > > >> For example, I would suggest something like this: > > > >> > > > >> group epoch: describes the group at a point in time, bumped when > set of > > > >> member ids changes (ie static or dynamic member leaves or joins) -- > > > >> triggered by group coordinator noticing a change in group membership > > > >> topology epoch: describes the topology version, bumped for each > > > successful > > > >> StreamsGroupInitialize that changes the broker's topology ID -- > > > triggered > > > >> by group coordinator bumping the topology ID (ie > StreamsGroupInitialize) > > > >> (optional) member epoch: describes the member version, bumped when a > > > >> memberId changes its rack ID, client tags, or process ID (maybe > topology > > > >> ID, not sure) -- is this needed only for static membership? Going > back > > > to > > > >> question S12, when is the member epoch is bumped in the current > > > proposal? > > > >> assignment epoch: describes the assignment version, bumped when the > > > >> assignor runs successfully (even if the actual assignment of tasks > does > > > not > > > >> change) -- can be triggered by tasks completing warmup, a manual > > > >> client-side trigger (future KIP only), etc (by definition is bumped > any > > > >> time any of the other epochs are bumped since they all trigger a > > > >> reassignment) > > > >> > > > >> S14. The KIP also mentions a new concept of an "assignment epoch" > but > > > >> doesn't seem to ever elaborate on how this is defined and what it's > used > > > >> for. I assume it's bumped each time the assignment changes and > > > represents a > > > >> sort of "assignment version", is that right? Can you describe in > more > > > >> detail the difference between the group epoch and the assignment > epoch? > > > >> > > > >> S15. I assume the "InstanceId" field in the HB request corresponds > to > > > the > > > >> group.instance.id config of static membership. I always felt this > field > > > >> name was too generic and easy to get mixed up with other > identifiers, > > > WDYT > > > >> about "StaticId" or "StaticInstanceId" to make the static membership > > > >> relation more explicit? > > > >> > > > >> S16. what is the "flexibleVersions" field in the RPC protocols? I > assume > > > >> this is related to versioning compatibility but it's not quite clear > > > from > > > >> the KIP how this is to be used. For example if we modify the > protocol by > > > >> adding new fields at the end, we'd bump the version but should the > > > >> flexibleVersions field change as well? > > > >> > > > >> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy > > > >> <lbruts...@confluent.io.invalid> wrote: > > > >> > > > >>> Hi Sophie, > > > >>> > > > >>> Thanks for your detailed comments - much appreciated! I think you > read > > > >>> a version of the KIP that did not yet include the admin > command-line > > > >>> tool and the Admin API extensions, so some of the comments are > already > > > >>> addressed in the KIP. > > > >>> > > > >>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in > the > > > >>> consumer background thread. Note that in the new consumer threading > > > >>> model, all RPCs are run by the background thread. Check out this: > > > >>> > > > >>> > > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > > > >>> for more information. Both our RPCs are just part of the group > > > >>> membership management and do not need to be invoked explicitly by > the > > > >>> application thread. You could imagine calling the initialize RPC > > > >>> explicitly (to implement explicit initialization), but this would > > > >>> still mean sending an event to the background thread, and the > > > >>> background thread in turn invokes the RPC. However, explicit > > > >>> initialization would require some additional public interfaces > that we > > > >>> are not including in this KIP. StreamsGroupDescribe is called by > the > > > >>> AdminClient, and used by the command-line tool > > > >>> kafka-streams-groups.sh. > > > >>> > > > >>> S2. I think the max.warmup.replicas=100 suggested by Nick was > intended > > > >>> as the upper limit for setting the group configuration on the > broker. > > > >>> Just to make sure that this was not a misunderstanding. By default, > > > >>> values above 100 should be rejected when setting a specific value > for > > > >>> group. Are you suggesting 20 or 30 for the default value for > groups, > > > >>> or the default upper limit for the group configuration? > > > >>> > > > >>> S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The > > > >>> MemberEpoch=-1 is a left-over from an earlier discussion. It means > > > >>> that the member is leaving the group, so the intention was that the > > > >>> member must leave the group when it asks the other members to shut > > > >>> down. We later reconsidered this and decided that all applications > > > >>> should just react to the shutdown application signal that is > returned > > > >>> by the broker, so the client first sets the ShutdownApplication and > > > >>> later leaves the group. Thanks for spotting this, I removed it. > > > >>> > > > >>> S4. Not sure if this refers to the latest version of the KIP. We > added > > > >>> an extension of the admin API and a kafka-streams-groups.sh > > > >>> command-line tool to the KIP already. > > > >>> > > > >>> S5. All RPCs for dealing with offsets will keep working with > streams > > > >>> groups. The extension of the admin API is rather cosmetic, since > the > > > >>> method names use "consumer group". The RPCs, however, are generic > and > > > >>> do not need to be changed. > > > >>> > > > >>> S6. Yes, you can use the DeleteGroup RPC with any group ID, whether > > > >>> streams group or not. > > > >>> > > > >>> S7. See the admin API section. > > > >>> > > > >>> S8. I guess for both A and B, I am not sure what you are > suggesting. > > > >>> Do you want to make the broker-side topology immutable and not > include > > > >>> any information about the topology, like the topology ID in the > RPC? > > > >>> It would seem that this would be a massive food-gun for people, if > > > >>> they start changing their topology and don't notice that the > broker is > > > >>> looking at a completely different version of the topology. Or did > you > > > >>> mean that there is some kind of topology ID, so that at least we > can > > > >>> detect inconsistencies between broker and client-side topologies, > and > > > >>> we fence out any member with an incorrect topology ID? Then we > seem to > > > >>> end up with mostly the same RPCs and the same questions (how is the > > > >>> topology ID generated?). I agree that the latter could be an > option. > > > >>> See summary at the end of this message. > > > >>> > > > >>> S9. If we flat out refuse topology updates, agree - we cannot let > the > > > >>> application crash on minor changes to the topology. However, if we > > > >>> allow topology updates as described in the KIP, there are only > upsides > > > >>> to making the topology ID more sensitive. All it will cause is > that a > > > >>> client will have to resend a `StreamsGroupInitialize`, and during > the > > > >>> rolling bounce, older clients will not get tasks assigned. > > > >>> > > > >>> S10. The intention in the KIP is really just this - old clients can > > > >>> only retain tasks, but not get new ones. If the topology has > > > >>> sub-topologies, these will initially be assigned to the new > clients, > > > >>> which also have the new topology. You are right that rolling an > > > >>> application where the old structure and the new structure are > > > >>> incompatible (e.g. different subtopologies access the same > partition) > > > >>> will cause problems. But this will also cause problems in the > current > > > >>> protocol, so I'm not sure, if it's strictly a regression, it's just > > > >>> unsupported (which we can only make the best effort to detect). > > > >>> > > > >>> In summary, for the topology ID discussion, I mainly see two > options: > > > >>> 1) stick with the current KIP proposal > > > >>> 2) define the topology ID as the hash of the StreamsGroupInitialize > > > >>> topology metadata only, as you suggested, and fence out members > with > > > >>> an incompatible topology ID. > > > >>> I think doing 2 is also a good option, but in the end it comes > down to > > > >>> how important we believe topology updates (where the set of > > > >>> sub-topologies or set of internal topics are affected) to be. The > main > > > >>> goal is to make the new protocol a drop-in replacement for the old > > > >>> protocol, and at least define the RPCs in a way that we won't need > > > >>> another KIP which bumps the RPC version to make the protocol > > > >>> production-ready. Adding more command-line tools, or public > interfaces > > > >>> seems less critical as it doesn't change the protocol and can be > done > > > >>> easily later on. The main question becomes - is the protocol > > > >>> production-ready and sufficient to replace the classic protocol if > we > > > >>> go with 2? > > > >>> > > > >>> Cheers, > > > >>> Lucas > > > >>> > > > > >