Just following up to say thank you Lucas for the detailed explanations, and
especially the thorough response to my more "existential" questions about
building off 848 vs introducing a separate Streams group protocol. This
really helped me understand the motivations behind this decision. No notes!

On Fri, Sep 6, 2024 at 7:17 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

> Hi Sophie,
>
> thanks for getting deeply involved in this discussion.
>
> S16. Adding tagged fields would not require an RPC version bump at
> all. This should already a cover a lot of use cases where requestion
> versioning wouldn't become an issue, as long as it is safe to ignore
> the new fields. Other than that, I'm not aware of a formal definition
> of the what request version bumps mean in relation to Apache Kafka
> versions - if anybody knows, please let me know. In my understanding,
> the Kafka protocol evolves independently of AK versions, and for the
> Kafka protocol to change, you require an accepted KIP, and not an AK
> release. The grey area is that a protocol change without an AK release
> does not mean much, because even if cloud vendors deploy it before an
> AK release, virtually no clients would support it and the protocol
> would fall back to the earlier RPC version. So maybe this isn't that
> clearly formalized because it does not matter much in practice.
> Normally, I would expect an RPC version bump after each KIP that
> breaks RPC compatibility. But there seems to be some flexibility here
> - for example, the KIP-848 RPCs were changed after the fact in trunk,
> because they were not exposed yet by default in AK (or any other
> implementation of the Kafka protocol), so a breaking change without
> request version bump was probably not deemed problematic.
>
> S17/18. We can add an informational table for the interface to the
> assignor, and that should also make clear how a client-side assignment
> interface would look. Generally, our idea would be to send all the
> input information for the task assignor to a client and expect all the
> output information of a task assignor in return, very similar to
> KIP-848. We did not include such a table because we do not define a
> public interface to define custom broker-side assignors in this KIP.
> But generally, the input / output of the (task) assignor mostly
> follows the data fields sent to / from the brokers. Let me do that,
> but first I'll respond to the rest of your questions that seem more
> important.
>
> S19. I think an explicit trigger for rebalance could make sense, but
> let me explain the thinking why we haven't included this in the
> current KIP: We don't want to include it for client-side assignment
> obviously, because this KIP does not include client-side assignment.
> For triggering a rebalance when a warm-up has caught up, this is
> treated differently in the KIP: the task assignor is called when a
> client sends updated task offsets. The client does not update its task
> offsets regularly, but according to an interval
> `task.offset.interval.ms`, but also immediately when a client notices
> that a warm-up has caught up. So effectively, it is mostly the client
> that is constantly checking the lag. I absolutely believe that the
> client should be checking for `acceptable.recovery.lag`, and we are
> using the presence of the task offset field to trigger reassignment.
> What we weren't sure is how often we need the task offsets to be
> updated and how often we want to retrigger the assignment because of
> that. An explicit field for "forcing" a reassignment may still make
> sense, so that we can update the task offsets on the broker more often
> than we call the assignor. However, I'm not convinced that this should
> just be a generic "rebalance trigger", as this seems to be going
> against the goals of KIP-848. Generally, I think one of the principles
> of KIP-848 is that a member just describes its current state and
> metadata, and the group coordinator makes a decision on whether
> reassignment needs to be triggered. So it would seem more natural to
> just indicate a boolean "I have caught-up warm-up tasks" to the group
> coordinator. I understand that for client-side assignment, you may
> want to trigger assignment explicitely without the group coordinator
> understanding why, but then we should add such a field as part of the
> client-side assignment KIP.
>
> S20. Yes, inside the group coordinator, the main differences are
> topology initialization handling, and the semantics of task assignment
> vs. partition assignment. It will not be a completely separate group
> coordinator, and we will not change things like the offset reading and
> writing. What will have its own code path is the handling of
> heartbeats and the reconciliation of the group (so, which tasks are
> sent to which member at which point in time until we reach the
> declarative target assignment that was produced by a client-side or
> server-side assignor). Also, the assignor implementations will
> obviously be completely separate from the consumer group assignors,
> but that is already the case currently. Streams groups will co-exist
> in the group coordinator with share groups (for queues), classic
> groups (old generic protocol for consumers and connect) and consumer
> groups (from KIP-848), and potentially a new group type for connect as
> well. Each of these group types have a separate code path for heart
> beating, reconciling assignments, keeping track of the internal state
> of the group and responding to specific requests from admin API, like
> ConsumerGroupDescribe. For streams, there will be a certain amount of
> duplication from consumer groups - for example, liveness tracking for
> members (setting timers for the session timeout etc.) is essentially
> the same between both groups, and the reconciliation logic is quite
> similar. We will make an effort to refactor as much of this code in a
> way that it can be reused by both group types, and a lot of things
> have already been made generic with the introduction of share groups
> in the group coordinator.
>
> S21. It's nice that we are discussing "why are we doing this" in
> question 21 :D. I think we are touching upon the reasoning in the KIP,
> but let me explain the reasoning in more detail. You are right, the
> reason we want to implement custom RPCs for Kafka Streams is not about
> broker-side vs. client-side assignment. The broker could implement the
> Member Metadata versioning scheme of Kafka Streams, intercept consumer
> group heartbeats that look like Kafka Streams heartbeats, deserialize
> the custom metadata and run a broker-side assignor for Kafka Streams,
> and finally serialize custom Kafka Streams assignment metadata, all
> based on KIP-848 RPCs.
>
> First of all, the main difference we see with custom RPCs is really
> that streams groups become a first-level concept for both the broker
> and clients of the Kafka. This also goes into the answer for S20 (what
> is different beside topology initialization and task reconciliation).
> With streams groups, I can use the Admin API call to see the current
> assignment, the target assignment, all the metadata necessary for
> assignment, and some information about the current topology in a
> human-readable and machine-readable form, instead of having to parse
> it from logs on some client machine that may or may not have the right
> log-level configured. This will help humans to understand what is
> going on, but can also power automated tools / CLIs / UIs, and I think
> this is a major improvement for operating a streams application.
> Again, this is not about who is in control of the broker or the
> clients, the information becomes accessible for everybody who can call
> the Admin API. It also does not depend on the cloud vendor, it will be
> an improvement available for pure AK users, and will also work with
> client-side assignment (although obviously, if there is an intention
> to start encoding proprietary data in black-box byte-arrays, this
> information remains hidden to users of the open source distribution
> again).
>
> I also think that the original proposal of reusing the new consumer
> group protocol for Kafka Streams has some downsides, that, while not
> being deal-breakers, would make the new behaviour of Kafka Streams
> less than ideal. One example is the reconciliation: if we have a
> custom stream partition assignor (client- or broker-side doesn't
> matter), we can only control the target assignment that the group
> converges to, but none of the intermediate states that the group
> transitions through while attempting to reach the desired end state.
> For example, there is no way to prevent the reconciliation loop of
> KIP-848 from giving an active task to processX before another instance
> on processX has revoked the corresponding standby task. There is no
> synchronization point where these movements are done at the same time
> as in the classic protocol, and on the broker we can't prevent this,
> because the reconciliation loop in KIP-848 does not know what a
> standby task is, or what a process ID is. You could argue, you can let
> the application run into LockExceptions in the meantime, but it will
> be one of the things that will make Kafka Streams work less than
> ideal. Another example is task offsets. It is beneficial for streams
> partition assignors to have relatively recent task offset information,
> which, in KIP-848, needs to be sent to the assignor using a black-box
> MemberMetadata byte array. This member metadata byte array is
> persisted to the consumer offset topic every time it changes (for
> failover). So we end up with a bad trade-off: if we send the task
> offset information to the broker very frequently, we will have to
> write those member metadata records very frequently as well. If we do
> not send them frequently, streams task assignors will have very
> outdated lag information. In KIP-1071 we can make the decision to not
> persist the task offset information, which avoids this trade-off. It
> gives us up-to-date lag information in the usual case, but after
> fail-over of the group coordinator, our lag information may be missing
> for a short period of time. These are two examples, but I think the
> general theme is, that with custom RPCs, Kafka Streams is in control
> of how the group metadata is processed and how the tasks are
> reconciled. This is not just a consideration for now, but also the
> future: will KIP-848 reconciliation and metadata management always
> work "good enough" for Kafka Streams, or will there be use cases where
> we want to evolve and customize the behaviour of Kafka Streams groups
> independently of consumer groups.
>
> A final point that I would like to make, I don't think that custom
> streams RPCs are inherently a lot harder to do for the Kafka Streams
> community than "just utilizing" KIP-848, at least not as much as you
> would think. Many things in KIP-848, such customizable assignment
> metadata with a metadata versioning scheme are made to look like
> general-purpose mechanisms, but are effectively mechanisms for Kafka
> Streams and also completely unimplemented at this point in any
> implementation of the Kafka protocol. We know that there is very
> little use of custom partition assignors in Kafka besides Kafka
> Streams, and we can expect the same to be true of these new
> "customizable" features of KIP-848. So, besides the Kafka Streams
> community, there is no one driving the implementation of these
> features.  "Just utilizing" KIP-848 for Kafka Streams would
> effectively mean to implement all this, and still make a lot of deep
> changes to Kafka Streams (as mechanisms like probing rebalances etc.
> don't make sense in KIP-848). I think adapting KIP-848 to work with
> Kafka Streams either way is a big effort. Piggybacking on consumer
> group RPCs and extending the consumer group protocol to the point that
> it can carry Kafka Streams on its back would not make things
> significantly easier. Some things, like online migration from the old
> protocol to the new protocol or reimplementing metadata versioning on
> top of KIP-848, may even be harder in KIP-848, than in KIP-1071. I
> don't think we can consider consumer group RPCs as a shortcut to
> revamp the streams rebalance protocol, but should be focussing on what
> makes Kafka Streams work best down the road.
>
> Hope that makes sense!
>
> Cheers,
> Lucas
>
> On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman
> <sop...@responsive.dev> wrote:
> >
> > Thanks for another detailed response Lucas! Especially w.r.t how the
> epochs
> > are defined. I also went back and re-read KIP-848 to refresh myself, I
> > guess it wasn't clear to me how much of the next-gen consumer protocol we
> > are reusing vs what's being built from the ground up.
> >
> > S16.
> >
> > >  It will become interesting when we want to include extra data after
> the
> > > initial protocol definition.
> >
> > I guess that's what I'm asking us to define -- how do we evolve the
> schema
> > when expanding the protocol? I think we need to set a few public
> contracts
> > here, such as whether/which versions get bumped when. For example if I
> > add/modify fields twice within a single Kafka release, do we still need
> to
> > update the version? I think the answer is "yes" since I know you
> Confluent
> > folks upgrade brokers more often than AK releases, but this is different
> > from how eg the SubscriptionInfo/AssignmentInfo of the
> > StreamsPartitionAssignor works today, hence I want to put all that in
> > writing (for those of us who aren't familiar with how all this works
> > already and may be modifying it in the future, say, to add client-side
> > assignment :P)
> >
> > Maybe a quick example of what to do to evolve this schema is sufficient?
> >
> > S17.
> > The KIP seems to throw the assignor, group coordinator, topic
> > initialization, and topology versioning all into a single black box on
> the
> > broker. I understand much of this is intentionally being glossed over as
> an
> > implementation detail since it's a KIP, but it would really help to tease
> > apart these distinct players and define their interactions. For example,
> > what are the inputs and outputs to the assignor? For example could we
> get a
> > table like the "Data Model
> > <
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel
> >"
> > in 848?
> >
> > KIP-848 goes into detail on these things, but again, it's unclear what
> > parts of 848 are and aren't supposed to be carried through. For example
> 848
> > has a concept of assignor "reasons" and client-side assignors and
> assignor
> > metadata which can all trigger a rebalance/group epoch bump. However in
> the
> > current plan for 1071 none of these exist. Which leads me to my next
> > point...
> >
> > S18.
> > Many people use a managed Kafka service like Confluent Cloud where
> brokers
> > are on the bleeding edge but clients are upgraded less frequently.
> However,
> > the opposite is also true for a great many users: client applications are
> > easy to upgrade and bumped often to get new features, whereas the brokers
> > (often run by a different team altogether) are considered riskier to
> > upgrade and stay on one version for a long time. We can argue about which
> > case is more common -- in my personal experience it's actually the
> latter,
> > although I only realized this after my time at Confluent which was
> > obviously skewed towards meeting Confluent Cloud users -- but regardless,
> > we obviously need to support both cases as first-class citizens. And I'm
> a
> > bit concerned that there's not enough in the KIP to ensure that
> client-side
> > assignment won't end up getting hacked into the protocol as an
> afterthought.
> >
> > Don't get me wrong, I'm still not demanding you guys implement
> client-side
> > assignors as part of this KIP. I'm happy to leave the implementation of
> > that to a followup, but I feel we need to flesh out the basic direction
> for
> > this right now, as part of the v0 Streams protocol.
> >
> > I'm also not asking you to do this alone, and am happy to work with/for
> you
> > on this. I have one possible proposal sketched out already, although
> before
> > I share that I just wanted to make sure we're aligned on the fundamentals
> > and have mapped out the protocol in full first (ie S17). Just want to
> have
> > a rough plan for how clients-side assignment will fit into the picture,
> and
> > right now it's difficult to define because it's unclear where the
> > boundaries between the group coordinator and broker-side assignor are (eg
> > what are assignor inputs and outputs).
> >
> > I'm committed to ensuring this doesn't add extra work or delay to this
> KIP
> > for you guys.
> >
> > S19.
> > I think it could make sense to include a concept like the "assignor
> > reason"  from 848  as a top-level field and rebalance trigger (ie group
> > epoch bump). This would mainly be for the client-side assignor to
> trigger a
> > rebalance, but for another example, it seems like we want the brokers to
> be
> > constantly monitoring warmup tasks and processing lag updates to know
> when
> > a warmup task is to be switched, which is heavy work. Why not just let
> the
> > client who owns the warmup and knows the lag decide when it has warmed up
> > and signal the broker? It's a small thing for a client to check when its
> > task gets within the acceptable recovery lag and notify via the heartbeat
> > if so, but it seems like a rather big deal for the broker to be
> constantly
> > computing these checks for every task on every hb. Just food for thought,
> > I  don't really have a strong preference over where the warmup decision
> > happens -- simply pointing it out that it fits very neatly into the
> > "assignor reason"/"rebalance requested" framework. Making this a
> > client-side decision also makes it easier to evolve and customize since
> > Kafka Streams users tend to have more control over upgrading/implementing
> > stuff in their client application vs the brokers (which is often a
> > different team altogether)
> >
> > S20.
> > To what extent do we plan to reuse implementations of 848 in order to
> build
> > the new Streams group protocol? I'm still having some trouble
> understanding
> > where we are just adding stuff on top and where we plan to rebuild from
> the
> > ground up. For example we seem to share most of the epoch reconciliation
> > stuff but will have different triggers for the group epoch bump. Is it
> > possible to just "plug in" the group epoch and let the consumer's group
> > coordinator figure stuff out from there? Will we have a completely
> distinct
> > Streams coordinator? I know this is an implementation detail but imo it's
> > ok to get in the weeds a bit for a large and complex KIP such as this.
> >
> > Setting aside the topology initialization stuff, is the main difference
> > really just in how the group coordinator will be aware of tasks for
> Streams
> > groups vs partitions for consumer groups?
> >
> > Which brings me to a high level question I feel I have to ask:
> >
> > S21.
> > Can you go into more detail on why we have to provide Kafka Streams its
> own
> > RPCs as opposed to just utilizing the protocol from KIP-848? The Rejected
> > Alternatives does address this but the main argument there seems to be in
> > implementing a broker-side assignor. But can't you do this with 848? And
> > given there are users who rarely upgrade their brokers just as there are
> > users who rarely upgrade their clients, it seems silly to do all this
> work
> > and change the fundamental direction of Kafka Streams just to give more
> > control over the assignment to the brokers.
> >
> > For a specific example, the KIP says "Client-side assignment makes it
> hard
> > to debug and tune the assignment logic" -- personally, I would find it
> > infinitely more difficult to tune and debug assignment logic if it were
> > happening on the broker. But that's just because I work with the app
> devs,
> > not the cluster operators. The point is we shouldn't design an entire
> > protocol to give preference to one vendor or another -- nobody wants the
> > ASF to start yelling at us  lol D:
> >
> > Another example, which imo is a good one (IIUC) is that "a simple
> parameter
> > change requires redeploying of all streams clients". That's definitely a
> > huge bummer, but again, why would broker-side assignors not be able to
> > trigger a reassignment based on a custom config in the 848 protocol? We
> > could still have the ha and sticky assignors implemented on the brokers,
> > and could still define all the Streams configs as broker configs to be
> > passed into the streams custom broker assignors. That feels like a lot
> less
> > work than redoing our own group protocol from scratch?
> >
> > I don't want you guys to think I'm trying to block the entire direction
> of
> > this KIP. I'd just like to better understand your thoughts on this since
> I
> > assume you've discussed this at length internally. Just help me
> understand
> > why this huge proposal is justified (for my own sake and to convince any
> > "others" who might be skeptical)
> >
> >
> > On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> > > I still need to catch up on the discussion in general. But as promised,
> > > I just started KIP-1088 about the `KafkaClientSupplier` question.
> > >
> > > Looking forward to your feedback on the new KIP. I hope we can get
> > > KIP-1088 done soon, to not block this KIP.
> > >
> > >
> > > -Matthias
> > >
> > > On 9/4/24 09:52, Lucas Brutschy wrote:
> > > > Hi Sophie,
> > > >
> > > > thanks for the questions and comments!
> > > >
> > > > S1. KIP-1071 does not need to add public interfaces to streams, but
> it
> > > > does not (yet) make the streams protocol client pluggable. Matthias
> > > > promised to write a KIP that will propose a solution for the
> > > > KafkaClientSupplier soon. He suggested keeping it separate from this
> > > > KIP, but we can treat it as a blocker. You are right that we could
> try
> > > > fitting an explicit topic initialization in this interface, if we
> > > > decide we absolutely need it now, although I'm not sure if that would
> > > > actually define a useful workflow for the users to explicitly update
> a
> > > > topology. Explicit initialization cannot be part of admin tools,
> > > > because we do not have the topology there. It could probably be a
> > > > config that defaults to off, or a method on the KafkaStreams object?
> > > > To me, explicit initialization opens a broad discussion on how to do
> > > > it, so I would like to avoid including it in this KIP. I guess it
> also
> > > > depends on the outcome of the discussion in S8-S10.
> > > >
> > > > S2. It's hard to define the perfect default, and we should probably
> > > > aim for safe operations first - since people can always up the limit.
> > > > I changed it to 20.
> > > >
> > > > S11. I was going to write we can add it, but it's actually quite
> > > > independent of KIP-1071 and also a nice newbie ticket, so I would
> > > > propose we treat it as out-of-scope and let somebody define it in a
> > > > separate KIP - any discussion about it would anyway be lost in this
> > > > long discussion thread.
> > > >
> > > > S12. Member epoch behaves the same as in KIP-848. See the KIP-848
> > > > explanation for basics:
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup
> > > >
> > > > Together with the section "Reconciliation of the group", this should
> > > > make clear how member epochs, group epochs and assignment epochs work
> > > > in KIP-1071, but I'll try to summarize the reconciliation a bit more
> > > > explicitly below:
> > > >
> > > > Essentially, the broker drives a reconciliation for each member
> > > > separately. The goal is to reconcile the assignment generated by the
> > > > assignor and reach the assignment epoch, but it's an asynchronous
> > > > process, so each member may still be on an older epoch, which is the
> > > > member epoch. We do not reinvent any of the internal machinery for
> > > > streams here, we are just adding standby tasks and warm-up tasks to
> > > > the reconciliation of KIP-848.
> > > >
> > > > During reconciliation, the broker first asks the member to revoke all
> > > > tasks that are not in its target assignment, by removing it from the
> > > > current assignment for the member (which is provided in the heartbeat
> > > > response). Until revocation is completed, the member remains on the
> > > > old member epoch and in `UNREVOKED` state. Only once the member
> > > > confirms revocation of these tasks by removing them from the set of
> > > > owned tasks in the heartbeat request, the member can transition to
> the
> > > > current assignment epoch and get new tasks assigned. Once the member
> > > > has revoked all tasks that are not in its target assignment, it
> > > > transitions to the new epoch and gets new tasks assigned. There are
> > > > some tasks that may be in the (declarative) target assignment of the
> > > > member generated by the assignor, but that may not be immediately
> > > > added to the current assignment of the member, because they are still
> > > > "unreleased". Unreleased tasks are:
> > > >
> > > > - A. For any task, another instance of the same process may own the
> > > > task in any role (active/standby/warm-up), and we are still awaiting
> > > > revocation.
> > > >
> > > > - B. For an active task, an instance of any process may own the same
> > > > active task, and we are still awaiting revocation.
> > > >
> > > > As long as there is at least one unreleased task, the member remains
> > > > in state UNRELEASED. Once all tasks of the target assignment are
> added
> > > > to the current assignment of the member (because they were released),
> > > > the member transitions to STABLE.
> > > >
> > > > Technically, the first constraint (we cannot assign a task in two
> > > > different roles to the same process) is only relevant for stateful
> > > > tasks that use the local state directory. I wonder if we should relax
> > > > this restriction slightly, by marking sub-topologies that access the
> > > > local state directory. This information could also be used by the
> > > > assignor. But we can also introduce as a follow-up improvement.
> > > >
> > > > S13i. You are right, these bumps of the group epoch are "on top" of
> > > > the KIP-848 reasons for bumping the group epoch. I added the KIP-848
> > > > reasons as well, because this was a bit confusing in the KIP.
> > > >
> > > > S13ii. The group epoch is bumped immediately once a warm-up task has
> > > > caught up, since this is what triggers running the assignor. However,
> > > > the assignment epoch is only bumped once the task assignor has run.
> If
> > > > the task assignment for a member hasn't changed, it will transition
> to
> > > > the new assignment epoch immediately.
> > > >
> > > > S13iii. In the case of a rolling bounce, the group epoch will be
> > > > bumped for every member that leaves and joins the group. For updating
> > > > rack ID, client tags, topology ID etc. -- this will mostly play a
> role
> > > > for static membership, as you mentioned, but would cause the same
> > > > thing - in a rolling bounce with static membership, we will attempt
> to
> > > > recompute the assignment after every bounce, if the topology ID is
> > > > changed in the members. This can cause broker load, as you mentioned.
> > > > Note, however, that we don't have to compute an assignment for every
> > > > group epoch bump. While KIP-848 computes its assignment as part of
> the
> > > > main loop of the group coordinator, it uses asynchronous assignment
> in
> > > > the case of client-side assignment, where the assignment is slower.
> So
> > > > it will kick off the assignment on the client side, enter state
> > > > `ASSIGNING`, but may have more group epoch bumps in the meantime
> while
> > > > the assignment is computed on the client side. We leave it open to
> use
> > > > the same mechanism on the broker to compute assignment asynchronously
> > > > on a separate thread, in case streams assignment is too slow for the
> > > > main loop of the group coordinator. We will also consider
> > > > rate-limiting the assignor to reduce broker load. However, it's too
> > > > early to define this in detail, as we don't know much about the
> > > > performance impact yet.
> > > >
> > > > S13iv. The heartbeat just sends "null" instead of the actual value in
> > > > case the value didn't change since the last heartbeat, so no
> > > > comparison is required. This is defined in the heartbeat RPC and the
> > > > same as in KIP-848.
> > > >
> > > > S13v. I think I disagree. The group epoch represents a version of the
> > > > group state, including the current topology and the metadata of all
> > > > members -- the whole input to the task assignor -- and defines
> exactly
> > > > the condition under which we attempt to recompute the assignment. So
> > > > it's not overloaded, and has a clear definition. It's also exactly
> the
> > > > same as in KIP-848, and we do not want to move away too far from that
> > > > protocol, unless it's something that is specific to streams that need
> > > > to be changed. Also, the protocol already uses three types of epochs
> -
> > > > member epoch, assignment epoch, group epoch. I don't think adding
> more
> > > > epochs will make things clearer. Unless we have a strong reason why
> we
> > > > need to add extra epochs, I'm not in favor.
> > > >
> > > > S14. Assignment epoch is the same as in KIP-848:
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment
> > > >
> > > > Its definition does not change at all, so we didn't describe it in
> > > > detail in KIP-1071.
> > > >
> > > > S15. I agree that the name is a bit generic, but the time to change
> > > > this was when `group.instance.id` was introduced. Also, if we start
> > > > diverging from regular consumers / consumer groups in terms of
> naming,
> > > > it will not make things clearer. "application.id" and "group.id" are
> > > > bad enough, in my eyes. I hope it's okay if I just improve the field
> > > > documentation.
> > > >
> > > > S16. This goes back to KIP-482. Essentially, in flexible request
> > > > schema versions you can define optional fields that will be encoded
> > > > with a field tag on the wire, but can also be completely omitted.
> > > > This is similar to how fields are encoded in, e.g., protobuf. It
> > > > improves schema evolution, because we can add new fields to the
> schema
> > > > without bumping the version. There is also a minor difference in
> > > > encoding size: when encoded, tagged fields take up more bytes on the
> > > > wire because of the field tag, but they can be omitted completely. It
> > > > will become interesting when we want to include extra data after the
> > > > initial protocol definition.
> > > >
> > > > Cheers,
> > > > Lucas
> > > >
> > > > On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman
> > > > <sop...@responsive.dev> wrote:
> > > >>
> > > >> Ah, my bad -- I thought I refreshed the page to get the latest
> version
> > > >> which is why I was a bit confused when I couldn't find anything
> about
> > > the
> > > >> new tools which I had previously seen in the KIP. Sorry for the
> > > confusion
> > > >> and unnecessary questions
> > > >>
> > > >> S1.
> > > >>
> > > >>> You could imagine calling the initialize RPC
> > > >>> explicitly (to implement explicit initialization), but this would
> > > >>> still mean sending an event to the background thread, and the
> > > >>> background thread in turn invokes the RPC. However, explicit
> > > >>> initialization would require some additional public interfaces
> that we
> > > >>> are not including in this KIP.
> > > >>
> > > >> I'm confused -- if we do all the group management stuff in a
> background
> > > >> thread to avoid needing public APIs, how do we pass all the
> > > >> Streams-specific and app-specific info to the broker for the
> > > >> StreamsGroupInitialize? I am guessing this is where we need to
> invoke
> > > >> internal APIs and is related to our discussion about removing the
> > > >> KafkaClientSupplier. However, it seems to me that no matter how you
> > > look at
> > > >> it, Kafka Streams will need to pass information to and from the
> > > background
> > > >> thread if the background thread is to be aware of things like tasks,
> > > offset
> > > >> sums, repartition topics, etc
> > > >>
> > > >> We don't really need to rehash everything here since it seems like
> the
> > > >> proposal to add a StreamsConsumer/StreamsAssignment interface will
> > > address
> > > >> this, which I believe we're in agreement on. I just wanted to point
> out
> > > >> that this work to replace the KafkaClientSupplier is clearly
> intimately
> > > >> tied up with KIP-1071, and should imho be part of this KIP and not
> its
> > > own
> > > >> standalone thing.
> > > >>
> > > >> By the way I'm happy to hop on a call to help move things forward on
> > > that
> > > >> front (or anything really). Although I guess we're just waiting on a
> > > design
> > > >> right now?
> > > >>
> > > >> S2. I was suggesting something lower for the default upper limit on
> all
> > > >> groups. I don't feel too strongly about it, just wanted to point out
> > > that
> > > >> the tradeoff is not about the assignor runtime but rather about
> resource
> > > >> consumption, and that too many warmups could put undue load on the
> > > cluster.
> > > >> In the end if we want to trust application operators then I'd say
> it's
> > > fine
> > > >> to use a higher cluster-wide max like 100.
> > > >>
> > > >> S8-10 I'll get back to you on this in another followup since I'm
> still
> > > >> thinking some things through and want to keep the discussion
> rolling for
> > > >> now. In the meantime I have some additional questions:
> > > >>
> > > >> S11. One of the main issues with unstable assignments today is the
> fact
> > > >> that assignors rely on deterministic assignment and use the process
> Id,
> > > >> which is not configurable and only persisted via local disk in a
> > > >> best-effort attempt. It would be a very small change to include
> this in
> > > >> KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR for
> > > this
> > > >> myself so as not to add to your load). There's a ticket for it here:
> > > >> https://issues.apache.org/jira/browse/KAFKA-15190
> > > >>
> > > >> S12. What exactly is the member epoch and how is it defined? When
> is it
> > > >> bumped? I see in the HB request field that certain values signal a
> > > member
> > > >> joining/leaving, but I take it there are valid positive values
> besides
> > > the
> > > >> 0/-1/-2 codes? More on this in the next question:
> > > >>
> > > >> S13. The KIP says the group epoch is updated in these three cases:
> > > >> a. Every time the topology is updated through the
> > > StreamsGroupInitialize API
> > > >> b. When a member with an assigned warm-up task reports a task
> changelog
> > > >> offset and task changelog end offset whose difference is less that
> > > >> acceptable.recovery.lag.
> > > >> c. When a member updates its topology ID, rack ID, client tags or
> > > process
> > > >> ID. Note: Typically, these do not change within the lifetime of a
> > > Streams
> > > >> client, so this only happens when a member with static membership
> > > rejoins
> > > >> with an updated configuration.
> > > >>
> > > >> S13.i First, just to clarify, these are not the *only* times the
> group
> > > >> epoch is bumped but rather the additional cases on top of the
> regular
> > > >> consumer group protocol -- so it's still bumped when a new member
> joins
> > > or
> > > >> leaves, etc, right?
> > > >> S13.ii Second, does (b) imply the epoch is bumped just whenever the
> > > broker
> > > >> notices a task has finished warming up, or does it first reassign
> the
> > > task
> > > >> and then bump the group epoch as a result of this task reassignment?
> > > >> S13.iii Third, (a) mentions the group epoch is bumped on each
> > > >> StreamsGroupInitialize API but (c) says it gets bumped whenever a
> member
> > > >> updates its topology ID. Does this mean that in the event of a
> rolling
> > > >> upgrade that changes the topology, the group epoch is bumped just
> once
> > > or
> > > >> for each member that updates its topology ID? Or does the "updating
> its
> > > >> topology ID" somehow have something to do with static membership?
> > > >> S13.iv How does the broker know when a member has changed its rack
> ID,
> > > >> client tags, etc -- does it compare these values on every hb?? That
> > > seems
> > > >> like a lot of broker load. If this is all strictly for static
> membership
> > > >> then have we considered leaving static membership support for a
> followup
> > > >> KIP? Not necessarily advocating for that, just wondering if this was
> > > >> thought about already. Imo it would be acceptable to not support
> static
> > > >> membership in the first version (especially if we fixed other issues
> > > like
> > > >> making the process Id configurable to get actual stable
> assignments!)
> > > >> S13.v I'm finding the concept of group epoch here to be somewhat
> > > >> overloaded. It sounds like it's trying to keep track of multiple
> things
> > > >> within a single version: the group membership, the topology
> version, the
> > > >> assignment version, and various member statuses (eg rack ID/client
> > > tags).
> > > >> Personally, I think it would be extremely valuable to unroll all of
> this
> > > >> into separate epochs with clear definitions and boundaries and
> triggers.
> > > >> For example, I would suggest something like this:
> > > >>
> > > >> group epoch: describes the group at a point in time, bumped when
> set of
> > > >> member ids changes (ie static or dynamic member leaves or joins) --
> > > >> triggered by group coordinator noticing a change in group membership
> > > >> topology epoch: describes the topology version, bumped for each
> > > successful
> > > >> StreamsGroupInitialize that changes the broker's topology ID --
> > > triggered
> > > >> by group coordinator bumping the topology ID (ie
> StreamsGroupInitialize)
> > > >> (optional) member epoch: describes the member version, bumped when a
> > > >> memberId changes its rack ID, client tags, or process ID (maybe
> topology
> > > >> ID, not sure) -- is this needed only for static membership? Going
> back
> > > to
> > > >> question S12, when is the member epoch is bumped in the current
> > > proposal?
> > > >> assignment epoch: describes the assignment version, bumped when the
> > > >> assignor runs successfully (even if the actual assignment of tasks
> does
> > > not
> > > >> change) -- can be triggered by tasks completing warmup, a manual
> > > >> client-side trigger (future KIP only), etc (by definition is bumped
> any
> > > >> time any of the other epochs are bumped since they all trigger a
> > > >> reassignment)
> > > >>
> > > >> S14. The KIP also mentions a new concept of an  "assignment epoch"
> but
> > > >> doesn't seem to ever elaborate on how this is defined and what it's
> used
> > > >> for. I assume it's bumped each time the assignment changes and
> > > represents a
> > > >> sort of "assignment version", is that right? Can you describe in
> more
> > > >> detail the difference between the group epoch and the assignment
> epoch?
> > > >>
> > > >> S15. I assume the "InstanceId" field in the HB request corresponds
> to
> > > the
> > > >> group.instance.id config of static membership. I always felt this
> field
> > > >> name was too generic and easy to get mixed up with other
> identifiers,
> > > WDYT
> > > >> about "StaticId" or "StaticInstanceId" to make the static membership
> > > >> relation more explicit?
> > > >>
> > > >> S16. what is the "flexibleVersions" field in the RPC protocols? I
> assume
> > > >> this is related to versioning compatibility but it's not quite clear
> > > from
> > > >> the KIP how this is to be used. For example if we modify the
> protocol by
> > > >> adding new fields at the end, we'd bump the version but should the
> > > >> flexibleVersions field change as well?
> > > >>
> > > >> On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy
> > > >> <lbruts...@confluent.io.invalid> wrote:
> > > >>
> > > >>> Hi Sophie,
> > > >>>
> > > >>> Thanks for your detailed comments - much appreciated! I think you
> read
> > > >>> a version of the KIP that did not yet include the admin
> command-line
> > > >>> tool and the Admin API extensions, so some of the comments are
> already
> > > >>> addressed in the KIP.
> > > >>>
> > > >>> S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in
> the
> > > >>> consumer background thread. Note that in the new consumer threading
> > > >>> model, all RPCs are run by the background thread. Check out this:
> > > >>>
> > > >>>
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
> > > >>> for more information. Both our RPCs are just part of the group
> > > >>> membership management and do not need to be invoked explicitly by
> the
> > > >>> application thread. You could imagine calling the initialize RPC
> > > >>> explicitly (to implement explicit initialization), but this would
> > > >>> still mean sending an event to the background thread, and the
> > > >>> background thread in turn invokes the RPC. However, explicit
> > > >>> initialization would require some additional public interfaces
> that we
> > > >>> are not including in this KIP. StreamsGroupDescribe is called by
> the
> > > >>> AdminClient, and used by the command-line tool
> > > >>> kafka-streams-groups.sh.
> > > >>>
> > > >>> S2. I think the max.warmup.replicas=100 suggested by Nick was
> intended
> > > >>> as the upper limit for setting the group configuration on the
> broker.
> > > >>> Just to make sure that this was not a misunderstanding. By default,
> > > >>> values above 100 should be rejected when setting a specific value
> for
> > > >>> group. Are you suggesting 20 or 30 for the default value for
> groups,
> > > >>> or the default upper limit for the group configuration?
> > > >>>
> > > >>> S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The
> > > >>> MemberEpoch=-1 is a left-over from an earlier discussion. It means
> > > >>> that the member is leaving the group, so the intention was that the
> > > >>> member must leave the group when it asks the other members to shut
> > > >>> down. We later reconsidered this and decided that all applications
> > > >>> should just react to the shutdown application signal that is
> returned
> > > >>> by the broker, so the client first sets the ShutdownApplication and
> > > >>> later leaves the group. Thanks for spotting this, I removed it.
> > > >>>
> > > >>> S4. Not sure if this refers to the latest version of the KIP. We
> added
> > > >>> an extension of the admin API and a kafka-streams-groups.sh
> > > >>> command-line tool to the KIP already.
> > > >>>
> > > >>> S5. All RPCs for dealing with offsets will keep working with
> streams
> > > >>> groups. The extension of the admin API is rather cosmetic, since
> the
> > > >>> method names use "consumer group". The RPCs, however, are generic
> and
> > > >>> do not need to be changed.
> > > >>>
> > > >>> S6. Yes, you can use the DeleteGroup RPC with any group ID, whether
> > > >>> streams group or not.
> > > >>>
> > > >>> S7. See the admin API section.
> > > >>>
> > > >>> S8. I guess for both A and B, I am not sure what you are
> suggesting.
> > > >>> Do you want to make the broker-side topology immutable and not
> include
> > > >>> any information about the topology, like the topology ID in the
> RPC?
> > > >>> It would seem that this would be a massive food-gun for people, if
> > > >>> they start changing their topology and don't notice that the
> broker is
> > > >>> looking at a completely different version of the topology. Or did
> you
> > > >>> mean that there is some kind of topology ID, so that at least we
> can
> > > >>> detect inconsistencies between broker and client-side topologies,
> and
> > > >>> we fence out any member with an incorrect topology ID? Then we
> seem to
> > > >>> end up with mostly the same RPCs and the same questions (how is the
> > > >>> topology ID generated?). I agree that the latter could be an
> option.
> > > >>> See summary at the end of this message.
> > > >>>
> > > >>> S9. If we flat out refuse topology updates, agree - we cannot let
> the
> > > >>> application crash on minor changes to the topology. However, if we
> > > >>> allow topology updates as described in the KIP, there are only
> upsides
> > > >>> to making the topology ID more sensitive. All it will cause is
> that a
> > > >>> client will have to resend a `StreamsGroupInitialize`, and during
> the
> > > >>> rolling bounce, older clients will not get tasks assigned.
> > > >>>
> > > >>> S10. The intention in the KIP is really just this - old clients can
> > > >>> only retain tasks, but not get new ones. If the topology has
> > > >>> sub-topologies, these will initially be assigned to the new
> clients,
> > > >>> which also have the new topology. You are right that rolling an
> > > >>> application where the old structure and the new structure are
> > > >>> incompatible (e.g. different subtopologies access the same
> partition)
> > > >>> will cause problems. But this will also cause problems in the
> current
> > > >>> protocol, so I'm not sure, if it's strictly a regression, it's just
> > > >>> unsupported (which we can only make the best effort to detect).
> > > >>>
> > > >>> In summary, for the topology ID discussion, I mainly see two
> options:
> > > >>> 1) stick with the current KIP proposal
> > > >>> 2) define the topology ID as the hash of the StreamsGroupInitialize
> > > >>> topology metadata only, as you suggested, and fence out members
> with
> > > >>> an incompatible topology ID.
> > > >>> I think doing 2 is also a good option, but in the end it comes
> down to
> > > >>> how important we believe topology updates (where the set of
> > > >>> sub-topologies or set of internal topics are affected) to be. The
> main
> > > >>> goal is to make the new protocol a drop-in replacement for the old
> > > >>> protocol, and at least define the RPCs in a way that we won't need
> > > >>> another KIP which bumps the RPC version to make the protocol
> > > >>> production-ready. Adding more command-line tools, or public
> interfaces
> > > >>> seems less critical as it doesn't change the protocol and can be
> done
> > > >>> easily later on. The main question becomes - is the protocol
> > > >>> production-ready and sufficient to replace the classic protocol if
> we
> > > >>> go with 2?
> > > >>>
> > > >>> Cheers,
> > > >>> Lucas
> > > >>>
> > >
>
>

Reply via email to