Hi Sophie,
thanks for getting deeply involved in this discussion.
S16. Adding tagged fields would not require an RPC version bump at
all. This should already a cover a lot of use cases where requestion
versioning wouldn't become an issue, as long as it is safe to ignore
the new fields. Other than that, I'm not aware of a formal definition
of the what request version bumps mean in relation to Apache Kafka
versions - if anybody knows, please let me know. In my understanding,
the Kafka protocol evolves independently of AK versions, and for the
Kafka protocol to change, you require an accepted KIP, and not an AK
release. The grey area is that a protocol change without an AK release
does not mean much, because even if cloud vendors deploy it before an
AK release, virtually no clients would support it and the protocol
would fall back to the earlier RPC version. So maybe this isn't that
clearly formalized because it does not matter much in practice.
Normally, I would expect an RPC version bump after each KIP that
breaks RPC compatibility. But there seems to be some flexibility here
- for example, the KIP-848 RPCs were changed after the fact in trunk,
because they were not exposed yet by default in AK (or any other
implementation of the Kafka protocol), so a breaking change without
request version bump was probably not deemed problematic.
S17/18. We can add an informational table for the interface to the
assignor, and that should also make clear how a client-side assignment
interface would look. Generally, our idea would be to send all the
input information for the task assignor to a client and expect all the
output information of a task assignor in return, very similar to
KIP-848. We did not include such a table because we do not define a
public interface to define custom broker-side assignors in this KIP.
But generally, the input / output of the (task) assignor mostly
follows the data fields sent to / from the brokers. Let me do that,
but first I'll respond to the rest of your questions that seem more
important.
S19. I think an explicit trigger for rebalance could make sense, but
let me explain the thinking why we haven't included this in the
current KIP: We don't want to include it for client-side assignment
obviously, because this KIP does not include client-side assignment.
For triggering a rebalance when a warm-up has caught up, this is
treated differently in the KIP: the task assignor is called when a
client sends updated task offsets. The client does not update its task
offsets regularly, but according to an interval
`task.offset.interval.ms`, but also immediately when a client notices
that a warm-up has caught up. So effectively, it is mostly the client
that is constantly checking the lag. I absolutely believe that the
client should be checking for `acceptable.recovery.lag`, and we are
using the presence of the task offset field to trigger reassignment.
What we weren't sure is how often we need the task offsets to be
updated and how often we want to retrigger the assignment because of
that. An explicit field for "forcing" a reassignment may still make
sense, so that we can update the task offsets on the broker more often
than we call the assignor. However, I'm not convinced that this should
just be a generic "rebalance trigger", as this seems to be going
against the goals of KIP-848. Generally, I think one of the principles
of KIP-848 is that a member just describes its current state and
metadata, and the group coordinator makes a decision on whether
reassignment needs to be triggered. So it would seem more natural to
just indicate a boolean "I have caught-up warm-up tasks" to the group
coordinator. I understand that for client-side assignment, you may
want to trigger assignment explicitely without the group coordinator
understanding why, but then we should add such a field as part of the
client-side assignment KIP.
S20. Yes, inside the group coordinator, the main differences are
topology initialization handling, and the semantics of task assignment
vs. partition assignment. It will not be a completely separate group
coordinator, and we will not change things like the offset reading and
writing. What will have its own code path is the handling of
heartbeats and the reconciliation of the group (so, which tasks are
sent to which member at which point in time until we reach the
declarative target assignment that was produced by a client-side or
server-side assignor). Also, the assignor implementations will
obviously be completely separate from the consumer group assignors,
but that is already the case currently. Streams groups will co-exist
in the group coordinator with share groups (for queues), classic
groups (old generic protocol for consumers and connect) and consumer
groups (from KIP-848), and potentially a new group type for connect as
well. Each of these group types have a separate code path for heart
beating, reconciling assignments, keeping track of the internal state
of the group and responding to specific requests from admin API, like
ConsumerGroupDescribe. For streams, there will be a certain amount of
duplication from consumer groups - for example, liveness tracking for
members (setting timers for the session timeout etc.) is essentially
the same between both groups, and the reconciliation logic is quite
similar. We will make an effort to refactor as much of this code in a
way that it can be reused by both group types, and a lot of things
have already been made generic with the introduction of share groups
in the group coordinator.
S21. It's nice that we are discussing "why are we doing this" in
question 21 :D. I think we are touching upon the reasoning in the KIP,
but let me explain the reasoning in more detail. You are right, the
reason we want to implement custom RPCs for Kafka Streams is not about
broker-side vs. client-side assignment. The broker could implement the
Member Metadata versioning scheme of Kafka Streams, intercept consumer
group heartbeats that look like Kafka Streams heartbeats, deserialize
the custom metadata and run a broker-side assignor for Kafka Streams,
and finally serialize custom Kafka Streams assignment metadata, all
based on KIP-848 RPCs.
First of all, the main difference we see with custom RPCs is really
that streams groups become a first-level concept for both the broker
and clients of the Kafka. This also goes into the answer for S20 (what
is different beside topology initialization and task reconciliation).
With streams groups, I can use the Admin API call to see the current
assignment, the target assignment, all the metadata necessary for
assignment, and some information about the current topology in a
human-readable and machine-readable form, instead of having to parse
it from logs on some client machine that may or may not have the right
log-level configured. This will help humans to understand what is
going on, but can also power automated tools / CLIs / UIs, and I think
this is a major improvement for operating a streams application.
Again, this is not about who is in control of the broker or the
clients, the information becomes accessible for everybody who can call
the Admin API. It also does not depend on the cloud vendor, it will be
an improvement available for pure AK users, and will also work with
client-side assignment (although obviously, if there is an intention
to start encoding proprietary data in black-box byte-arrays, this
information remains hidden to users of the open source distribution
again).
I also think that the original proposal of reusing the new consumer
group protocol for Kafka Streams has some downsides, that, while not
being deal-breakers, would make the new behaviour of Kafka Streams
less than ideal. One example is the reconciliation: if we have a
custom stream partition assignor (client- or broker-side doesn't
matter), we can only control the target assignment that the group
converges to, but none of the intermediate states that the group
transitions through while attempting to reach the desired end state.
For example, there is no way to prevent the reconciliation loop of
KIP-848 from giving an active task to processX before another instance
on processX has revoked the corresponding standby task. There is no
synchronization point where these movements are done at the same time
as in the classic protocol, and on the broker we can't prevent this,
because the reconciliation loop in KIP-848 does not know what a
standby task is, or what a process ID is. You could argue, you can let
the application run into LockExceptions in the meantime, but it will
be one of the things that will make Kafka Streams work less than
ideal. Another example is task offsets. It is beneficial for streams
partition assignors to have relatively recent task offset information,
which, in KIP-848, needs to be sent to the assignor using a black-box
MemberMetadata byte array. This member metadata byte array is
persisted to the consumer offset topic every time it changes (for
failover). So we end up with a bad trade-off: if we send the task
offset information to the broker very frequently, we will have to
write those member metadata records very frequently as well. If we do
not send them frequently, streams task assignors will have very
outdated lag information. In KIP-1071 we can make the decision to not
persist the task offset information, which avoids this trade-off. It
gives us up-to-date lag information in the usual case, but after
fail-over of the group coordinator, our lag information may be missing
for a short period of time. These are two examples, but I think the
general theme is, that with custom RPCs, Kafka Streams is in control
of how the group metadata is processed and how the tasks are
reconciled. This is not just a consideration for now, but also the
future: will KIP-848 reconciliation and metadata management always
work "good enough" for Kafka Streams, or will there be use cases where
we want to evolve and customize the behaviour of Kafka Streams groups
independently of consumer groups.
A final point that I would like to make, I don't think that custom
streams RPCs are inherently a lot harder to do for the Kafka Streams
community than "just utilizing" KIP-848, at least not as much as you
would think. Many things in KIP-848, such customizable assignment
metadata with a metadata versioning scheme are made to look like
general-purpose mechanisms, but are effectively mechanisms for Kafka
Streams and also completely unimplemented at this point in any
implementation of the Kafka protocol. We know that there is very
little use of custom partition assignors in Kafka besides Kafka
Streams, and we can expect the same to be true of these new
"customizable" features of KIP-848. So, besides the Kafka Streams
community, there is no one driving the implementation of these
features. "Just utilizing" KIP-848 for Kafka Streams would
effectively mean to implement all this, and still make a lot of deep
changes to Kafka Streams (as mechanisms like probing rebalances etc.
don't make sense in KIP-848). I think adapting KIP-848 to work with
Kafka Streams either way is a big effort. Piggybacking on consumer
group RPCs and extending the consumer group protocol to the point that
it can carry Kafka Streams on its back would not make things
significantly easier. Some things, like online migration from the old
protocol to the new protocol or reimplementing metadata versioning on
top of KIP-848, may even be harder in KIP-848, than in KIP-1071. I
don't think we can consider consumer group RPCs as a shortcut to
revamp the streams rebalance protocol, but should be focussing on what
makes Kafka Streams work best down the road.
Hope that makes sense!
Cheers,
Lucas
On Fri, Sep 6, 2024 at 6:47 AM Sophie Blee-Goldman
<sop...@responsive.dev> wrote:
Thanks for another detailed response Lucas! Especially w.r.t how the
epochs
are defined. I also went back and re-read KIP-848 to refresh myself, I
guess it wasn't clear to me how much of the next-gen consumer protocol we
are reusing vs what's being built from the ground up.
S16.
It will become interesting when we want to include extra data after
the
initial protocol definition.
I guess that's what I'm asking us to define -- how do we evolve the
schema
when expanding the protocol? I think we need to set a few public
contracts
here, such as whether/which versions get bumped when. For example if I
add/modify fields twice within a single Kafka release, do we still need
to
update the version? I think the answer is "yes" since I know you
Confluent
folks upgrade brokers more often than AK releases, but this is different
from how eg the SubscriptionInfo/AssignmentInfo of the
StreamsPartitionAssignor works today, hence I want to put all that in
writing (for those of us who aren't familiar with how all this works
already and may be modifying it in the future, say, to add client-side
assignment :P)
Maybe a quick example of what to do to evolve this schema is sufficient?
S17.
The KIP seems to throw the assignor, group coordinator, topic
initialization, and topology versioning all into a single black box on
the
broker. I understand much of this is intentionally being glossed over as
an
implementation detail since it's a KIP, but it would really help to tease
apart these distinct players and define their interactions. For example,
what are the inputs and outputs to the assignor? For example could we
get a
table like the "Data Model
<
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=217387038#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-DataModel
"
in 848?
KIP-848 goes into detail on these things, but again, it's unclear what
parts of 848 are and aren't supposed to be carried through. For example
848
has a concept of assignor "reasons" and client-side assignors and
assignor
metadata which can all trigger a rebalance/group epoch bump. However in
the
current plan for 1071 none of these exist. Which leads me to my next
point...
S18.
Many people use a managed Kafka service like Confluent Cloud where
brokers
are on the bleeding edge but clients are upgraded less frequently.
However,
the opposite is also true for a great many users: client applications are
easy to upgrade and bumped often to get new features, whereas the brokers
(often run by a different team altogether) are considered riskier to
upgrade and stay on one version for a long time. We can argue about which
case is more common -- in my personal experience it's actually the
latter,
although I only realized this after my time at Confluent which was
obviously skewed towards meeting Confluent Cloud users -- but regardless,
we obviously need to support both cases as first-class citizens. And I'm
a
bit concerned that there's not enough in the KIP to ensure that
client-side
assignment won't end up getting hacked into the protocol as an
afterthought.
Don't get me wrong, I'm still not demanding you guys implement
client-side
assignors as part of this KIP. I'm happy to leave the implementation of
that to a followup, but I feel we need to flesh out the basic direction
for
this right now, as part of the v0 Streams protocol.
I'm also not asking you to do this alone, and am happy to work with/for
you
on this. I have one possible proposal sketched out already, although
before
I share that I just wanted to make sure we're aligned on the fundamentals
and have mapped out the protocol in full first (ie S17). Just want to
have
a rough plan for how clients-side assignment will fit into the picture,
and
right now it's difficult to define because it's unclear where the
boundaries between the group coordinator and broker-side assignor are (eg
what are assignor inputs and outputs).
I'm committed to ensuring this doesn't add extra work or delay to this
KIP
for you guys.
S19.
I think it could make sense to include a concept like the "assignor
reason" from 848 as a top-level field and rebalance trigger (ie group
epoch bump). This would mainly be for the client-side assignor to
trigger a
rebalance, but for another example, it seems like we want the brokers to
be
constantly monitoring warmup tasks and processing lag updates to know
when
a warmup task is to be switched, which is heavy work. Why not just let
the
client who owns the warmup and knows the lag decide when it has warmed up
and signal the broker? It's a small thing for a client to check when its
task gets within the acceptable recovery lag and notify via the heartbeat
if so, but it seems like a rather big deal for the broker to be
constantly
computing these checks for every task on every hb. Just food for thought,
I don't really have a strong preference over where the warmup decision
happens -- simply pointing it out that it fits very neatly into the
"assignor reason"/"rebalance requested" framework. Making this a
client-side decision also makes it easier to evolve and customize since
Kafka Streams users tend to have more control over upgrading/implementing
stuff in their client application vs the brokers (which is often a
different team altogether)
S20.
To what extent do we plan to reuse implementations of 848 in order to
build
the new Streams group protocol? I'm still having some trouble
understanding
where we are just adding stuff on top and where we plan to rebuild from
the
ground up. For example we seem to share most of the epoch reconciliation
stuff but will have different triggers for the group epoch bump. Is it
possible to just "plug in" the group epoch and let the consumer's group
coordinator figure stuff out from there? Will we have a completely
distinct
Streams coordinator? I know this is an implementation detail but imo it's
ok to get in the weeds a bit for a large and complex KIP such as this.
Setting aside the topology initialization stuff, is the main difference
really just in how the group coordinator will be aware of tasks for
Streams
groups vs partitions for consumer groups?
Which brings me to a high level question I feel I have to ask:
S21.
Can you go into more detail on why we have to provide Kafka Streams its
own
RPCs as opposed to just utilizing the protocol from KIP-848? The Rejected
Alternatives does address this but the main argument there seems to be in
implementing a broker-side assignor. But can't you do this with 848? And
given there are users who rarely upgrade their brokers just as there are
users who rarely upgrade their clients, it seems silly to do all this
work
and change the fundamental direction of Kafka Streams just to give more
control over the assignment to the brokers.
For a specific example, the KIP says "Client-side assignment makes it
hard
to debug and tune the assignment logic" -- personally, I would find it
infinitely more difficult to tune and debug assignment logic if it were
happening on the broker. But that's just because I work with the app
devs,
not the cluster operators. The point is we shouldn't design an entire
protocol to give preference to one vendor or another -- nobody wants the
ASF to start yelling at us lol D:
Another example, which imo is a good one (IIUC) is that "a simple
parameter
change requires redeploying of all streams clients". That's definitely a
huge bummer, but again, why would broker-side assignors not be able to
trigger a reassignment based on a custom config in the 848 protocol? We
could still have the ha and sticky assignors implemented on the brokers,
and could still define all the Streams configs as broker configs to be
passed into the streams custom broker assignors. That feels like a lot
less
work than redoing our own group protocol from scratch?
I don't want you guys to think I'm trying to block the entire direction
of
this KIP. I'd just like to better understand your thoughts on this since
I
assume you've discussed this at length internally. Just help me
understand
why this huge proposal is justified (for my own sake and to convince any
"others" who might be skeptical)
On Wed, Sep 4, 2024 at 6:00 PM Matthias J. Sax <mj...@apache.org> wrote:
I still need to catch up on the discussion in general. But as promised,
I just started KIP-1088 about the `KafkaClientSupplier` question.
Looking forward to your feedback on the new KIP. I hope we can get
KIP-1088 done soon, to not block this KIP.
-Matthias
On 9/4/24 09:52, Lucas Brutschy wrote:
Hi Sophie,
thanks for the questions and comments!
S1. KIP-1071 does not need to add public interfaces to streams, but
it
does not (yet) make the streams protocol client pluggable. Matthias
promised to write a KIP that will propose a solution for the
KafkaClientSupplier soon. He suggested keeping it separate from this
KIP, but we can treat it as a blocker. You are right that we could
try
fitting an explicit topic initialization in this interface, if we
decide we absolutely need it now, although I'm not sure if that would
actually define a useful workflow for the users to explicitly update
a
topology. Explicit initialization cannot be part of admin tools,
because we do not have the topology there. It could probably be a
config that defaults to off, or a method on the KafkaStreams object?
To me, explicit initialization opens a broad discussion on how to do
it, so I would like to avoid including it in this KIP. I guess it
also
depends on the outcome of the discussion in S8-S10.
S2. It's hard to define the perfect default, and we should probably
aim for safe operations first - since people can always up the limit.
I changed it to 20.
S11. I was going to write we can add it, but it's actually quite
independent of KIP-1071 and also a nice newbie ticket, so I would
propose we treat it as out-of-scope and let somebody define it in a
separate KIP - any discussion about it would anyway be lost in this
long discussion thread.
S12. Member epoch behaves the same as in KIP-848. See the KIP-848
explanation for basics:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup
Together with the section "Reconciliation of the group", this should
make clear how member epochs, group epochs and assignment epochs work
in KIP-1071, but I'll try to summarize the reconciliation a bit more
explicitly below:
Essentially, the broker drives a reconciliation for each member
separately. The goal is to reconcile the assignment generated by the
assignor and reach the assignment epoch, but it's an asynchronous
process, so each member may still be on an older epoch, which is the
member epoch. We do not reinvent any of the internal machinery for
streams here, we are just adding standby tasks and warm-up tasks to
the reconciliation of KIP-848.
During reconciliation, the broker first asks the member to revoke all
tasks that are not in its target assignment, by removing it from the
current assignment for the member (which is provided in the heartbeat
response). Until revocation is completed, the member remains on the
old member epoch and in `UNREVOKED` state. Only once the member
confirms revocation of these tasks by removing them from the set of
owned tasks in the heartbeat request, the member can transition to
the
current assignment epoch and get new tasks assigned. Once the member
has revoked all tasks that are not in its target assignment, it
transitions to the new epoch and gets new tasks assigned. There are
some tasks that may be in the (declarative) target assignment of the
member generated by the assignor, but that may not be immediately
added to the current assignment of the member, because they are still
"unreleased". Unreleased tasks are:
- A. For any task, another instance of the same process may own the
task in any role (active/standby/warm-up), and we are still awaiting
revocation.
- B. For an active task, an instance of any process may own the same
active task, and we are still awaiting revocation.
As long as there is at least one unreleased task, the member remains
in state UNRELEASED. Once all tasks of the target assignment are
added
to the current assignment of the member (because they were released),
the member transitions to STABLE.
Technically, the first constraint (we cannot assign a task in two
different roles to the same process) is only relevant for stateful
tasks that use the local state directory. I wonder if we should relax
this restriction slightly, by marking sub-topologies that access the
local state directory. This information could also be used by the
assignor. But we can also introduce as a follow-up improvement.
S13i. You are right, these bumps of the group epoch are "on top" of
the KIP-848 reasons for bumping the group epoch. I added the KIP-848
reasons as well, because this was a bit confusing in the KIP.
S13ii. The group epoch is bumped immediately once a warm-up task has
caught up, since this is what triggers running the assignor. However,
the assignment epoch is only bumped once the task assignor has run.
If
the task assignment for a member hasn't changed, it will transition
to
the new assignment epoch immediately.
S13iii. In the case of a rolling bounce, the group epoch will be
bumped for every member that leaves and joins the group. For updating
rack ID, client tags, topology ID etc. -- this will mostly play a
role
for static membership, as you mentioned, but would cause the same
thing - in a rolling bounce with static membership, we will attempt
to
recompute the assignment after every bounce, if the topology ID is
changed in the members. This can cause broker load, as you mentioned.
Note, however, that we don't have to compute an assignment for every
group epoch bump. While KIP-848 computes its assignment as part of
the
main loop of the group coordinator, it uses asynchronous assignment
in
the case of client-side assignment, where the assignment is slower.
So
it will kick off the assignment on the client side, enter state
`ASSIGNING`, but may have more group epoch bumps in the meantime
while
the assignment is computed on the client side. We leave it open to
use
the same mechanism on the broker to compute assignment asynchronously
on a separate thread, in case streams assignment is too slow for the
main loop of the group coordinator. We will also consider
rate-limiting the assignor to reduce broker load. However, it's too
early to define this in detail, as we don't know much about the
performance impact yet.
S13iv. The heartbeat just sends "null" instead of the actual value in
case the value didn't change since the last heartbeat, so no
comparison is required. This is defined in the heartbeat RPC and the
same as in KIP-848.
S13v. I think I disagree. The group epoch represents a version of the
group state, including the current topology and the metadata of all
members -- the whole input to the task assignor -- and defines
exactly
the condition under which we attempt to recompute the assignment. So
it's not overloaded, and has a clear definition. It's also exactly
the
same as in KIP-848, and we do not want to move away too far from that
protocol, unless it's something that is specific to streams that need
to be changed. Also, the protocol already uses three types of epochs
-
member epoch, assignment epoch, group epoch. I don't think adding
more
epochs will make things clearer. Unless we have a strong reason why
we
need to add extra epochs, I'm not in favor.
S14. Assignment epoch is the same as in KIP-848:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment
Its definition does not change at all, so we didn't describe it in
detail in KIP-1071.
S15. I agree that the name is a bit generic, but the time to change
this was when `group.instance.id` was introduced. Also, if we start
diverging from regular consumers / consumer groups in terms of
naming,
it will not make things clearer. "application.id" and "group.id" are
bad enough, in my eyes. I hope it's okay if I just improve the field
documentation.
S16. This goes back to KIP-482. Essentially, in flexible request
schema versions you can define optional fields that will be encoded
with a field tag on the wire, but can also be completely omitted.
This is similar to how fields are encoded in, e.g., protobuf. It
improves schema evolution, because we can add new fields to the
schema
without bumping the version. There is also a minor difference in
encoding size: when encoded, tagged fields take up more bytes on the
wire because of the field tag, but they can be omitted completely. It
will become interesting when we want to include extra data after the
initial protocol definition.
Cheers,
Lucas
On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman
<sop...@responsive.dev> wrote:
Ah, my bad -- I thought I refreshed the page to get the latest
version
which is why I was a bit confused when I couldn't find anything
about
the
new tools which I had previously seen in the KIP. Sorry for the
confusion
and unnecessary questions
S1.
You could imagine calling the initialize RPC
explicitly (to implement explicit initialization), but this would
still mean sending an event to the background thread, and the
background thread in turn invokes the RPC. However, explicit
initialization would require some additional public interfaces
that we
are not including in this KIP.
I'm confused -- if we do all the group management stuff in a
background
thread to avoid needing public APIs, how do we pass all the
Streams-specific and app-specific info to the broker for the
StreamsGroupInitialize? I am guessing this is where we need to
invoke
internal APIs and is related to our discussion about removing the
KafkaClientSupplier. However, it seems to me that no matter how you
look at
it, Kafka Streams will need to pass information to and from the
background
thread if the background thread is to be aware of things like tasks,
offset
sums, repartition topics, etc
We don't really need to rehash everything here since it seems like
the
proposal to add a StreamsConsumer/StreamsAssignment interface will
address
this, which I believe we're in agreement on. I just wanted to point
out
that this work to replace the KafkaClientSupplier is clearly
intimately
tied up with KIP-1071, and should imho be part of this KIP and not
its
own
standalone thing.
By the way I'm happy to hop on a call to help move things forward on
that
front (or anything really). Although I guess we're just waiting on a
design
right now?
S2. I was suggesting something lower for the default upper limit on
all
groups. I don't feel too strongly about it, just wanted to point out
that
the tradeoff is not about the assignor runtime but rather about
resource
consumption, and that too many warmups could put undue load on the
cluster.
In the end if we want to trust application operators then I'd say
it's
fine
to use a higher cluster-wide max like 100.
S8-10 I'll get back to you on this in another followup since I'm
still
thinking some things through and want to keep the discussion
rolling for
now. In the meantime I have some additional questions:
S11. One of the main issues with unstable assignments today is the
fact
that assignors rely on deterministic assignment and use the process
Id,
which is not configurable and only persisted via local disk in a
best-effort attempt. It would be a very small change to include
this in
KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR for
this
myself so as not to add to your load). There's a ticket for it here:
https://issues.apache.org/jira/browse/KAFKA-15190
S12. What exactly is the member epoch and how is it defined? When
is it
bumped? I see in the HB request field that certain values signal a
member
joining/leaving, but I take it there are valid positive values
besides
the
0/-1/-2 codes? More on this in the next question:
S13. The KIP says the group epoch is updated in these three cases:
a. Every time the topology is updated through the
StreamsGroupInitialize API
b. When a member with an assigned warm-up task reports a task
changelog
offset and task changelog end offset whose difference is less that
acceptable.recovery.lag.
c. When a member updates its topology ID, rack ID, client tags or
process
ID. Note: Typically, these do not change within the lifetime of a
Streams
client, so this only happens when a member with static membership
rejoins
with an updated configuration.
S13.i First, just to clarify, these are not the *only* times the
group
epoch is bumped but rather the additional cases on top of the
regular
consumer group protocol -- so it's still bumped when a new member
joins
or
leaves, etc, right?
S13.ii Second, does (b) imply the epoch is bumped just whenever the
broker
notices a task has finished warming up, or does it first reassign
the
task
and then bump the group epoch as a result of this task reassignment?
S13.iii Third, (a) mentions the group epoch is bumped on each
StreamsGroupInitialize API but (c) says it gets bumped whenever a
member
updates its topology ID. Does this mean that in the event of a
rolling
upgrade that changes the topology, the group epoch is bumped just
once
or
for each member that updates its topology ID? Or does the "updating
its
topology ID" somehow have something to do with static membership?
S13.iv How does the broker know when a member has changed its rack
ID,
client tags, etc -- does it compare these values on every hb?? That
seems
like a lot of broker load. If this is all strictly for static
membership
then have we considered leaving static membership support for a
followup
KIP? Not necessarily advocating for that, just wondering if this was
thought about already. Imo it would be acceptable to not support
static
membership in the first version (especially if we fixed other issues
like
making the process Id configurable to get actual stable
assignments!)
S13.v I'm finding the concept of group epoch here to be somewhat
overloaded. It sounds like it's trying to keep track of multiple
things
within a single version: the group membership, the topology
version, the
assignment version, and various member statuses (eg rack ID/client
tags).
Personally, I think it would be extremely valuable to unroll all of
this
into separate epochs with clear definitions and boundaries and
triggers.
For example, I would suggest something like this:
group epoch: describes the group at a point in time, bumped when
set of
member ids changes (ie static or dynamic member leaves or joins) --
triggered by group coordinator noticing a change in group membership
topology epoch: describes the topology version, bumped for each
successful
StreamsGroupInitialize that changes the broker's topology ID --
triggered
by group coordinator bumping the topology ID (ie
StreamsGroupInitialize)
(optional) member epoch: describes the member version, bumped when a
memberId changes its rack ID, client tags, or process ID (maybe
topology
ID, not sure) -- is this needed only for static membership? Going
back
to
question S12, when is the member epoch is bumped in the current
proposal?
assignment epoch: describes the assignment version, bumped when the
assignor runs successfully (even if the actual assignment of tasks
does
not
change) -- can be triggered by tasks completing warmup, a manual
client-side trigger (future KIP only), etc (by definition is bumped
any
time any of the other epochs are bumped since they all trigger a
reassignment)
S14. The KIP also mentions a new concept of an "assignment epoch"
but
doesn't seem to ever elaborate on how this is defined and what it's
used
for. I assume it's bumped each time the assignment changes and
represents a
sort of "assignment version", is that right? Can you describe in
more
detail the difference between the group epoch and the assignment
epoch?
S15. I assume the "InstanceId" field in the HB request corresponds
to
the
group.instance.id config of static membership. I always felt this
field
name was too generic and easy to get mixed up with other
identifiers,
WDYT
about "StaticId" or "StaticInstanceId" to make the static membership
relation more explicit?
S16. what is the "flexibleVersions" field in the RPC protocols? I
assume
this is related to versioning compatibility but it's not quite clear
from
the KIP how this is to be used. For example if we modify the
protocol by
adding new fields at the end, we'd bump the version but should the
flexibleVersions field change as well?
On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:
Hi Sophie,
Thanks for your detailed comments - much appreciated! I think you
read
a version of the KIP that did not yet include the admin
command-line
tool and the Admin API extensions, so some of the comments are
already
addressed in the KIP.
S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in
the
consumer background thread. Note that in the new consumer threading
model, all RPCs are run by the background thread. Check out this:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
for more information. Both our RPCs are just part of the group
membership management and do not need to be invoked explicitly by
the
application thread. You could imagine calling the initialize RPC
explicitly (to implement explicit initialization), but this would
still mean sending an event to the background thread, and the
background thread in turn invokes the RPC. However, explicit
initialization would require some additional public interfaces
that we
are not including in this KIP. StreamsGroupDescribe is called by
the
AdminClient, and used by the command-line tool
kafka-streams-groups.sh.
S2. I think the max.warmup.replicas=100 suggested by Nick was
intended
as the upper limit for setting the group configuration on the
broker.
Just to make sure that this was not a misunderstanding. By default,
values above 100 should be rejected when setting a specific value
for
group. Are you suggesting 20 or 30 for the default value for
groups,
or the default upper limit for the group configuration?
S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The
MemberEpoch=-1 is a left-over from an earlier discussion. It means
that the member is leaving the group, so the intention was that the
member must leave the group when it asks the other members to shut
down. We later reconsidered this and decided that all applications
should just react to the shutdown application signal that is
returned
by the broker, so the client first sets the ShutdownApplication and
later leaves the group. Thanks for spotting this, I removed it.
S4. Not sure if this refers to the latest version of the KIP. We
added
an extension of the admin API and a kafka-streams-groups.sh
command-line tool to the KIP already.
S5. All RPCs for dealing with offsets will keep working with
streams
groups. The extension of the admin API is rather cosmetic, since
the
method names use "consumer group". The RPCs, however, are generic
and
do not need to be changed.
S6. Yes, you can use the DeleteGroup RPC with any group ID, whether
streams group or not.
S7. See the admin API section.
S8. I guess for both A and B, I am not sure what you are
suggesting.
Do you want to make the broker-side topology immutable and not
include
any information about the topology, like the topology ID in the
RPC?
It would seem that this would be a massive food-gun for people, if
they start changing their topology and don't notice that the
broker is
looking at a completely different version of the topology. Or did
you
mean that there is some kind of topology ID, so that at least we
can
detect inconsistencies between broker and client-side topologies,
and
we fence out any member with an incorrect topology ID? Then we
seem to
end up with mostly the same RPCs and the same questions (how is the
topology ID generated?). I agree that the latter could be an
option.
See summary at the end of this message.
S9. If we flat out refuse topology updates, agree - we cannot let
the
application crash on minor changes to the topology. However, if we
allow topology updates as described in the KIP, there are only
upsides
to making the topology ID more sensitive. All it will cause is
that a
client will have to resend a `StreamsGroupInitialize`, and during
the
rolling bounce, older clients will not get tasks assigned.
S10. The intention in the KIP is really just this - old clients can
only retain tasks, but not get new ones. If the topology has
sub-topologies, these will initially be assigned to the new
clients,
which also have the new topology. You are right that rolling an
application where the old structure and the new structure are
incompatible (e.g. different subtopologies access the same
partition)
will cause problems. But this will also cause problems in the
current
protocol, so I'm not sure, if it's strictly a regression, it's just
unsupported (which we can only make the best effort to detect).
In summary, for the topology ID discussion, I mainly see two
options:
1) stick with the current KIP proposal
2) define the topology ID as the hash of the StreamsGroupInitialize
topology metadata only, as you suggested, and fence out members
with
an incompatible topology ID.
I think doing 2 is also a good option, but in the end it comes
down to
how important we believe topology updates (where the set of
sub-topologies or set of internal topics are affected) to be. The
main
goal is to make the new protocol a drop-in replacement for the old
protocol, and at least define the RPCs in a way that we won't need
another KIP which bumps the RPC version to make the protocol
production-ready. Adding more command-line tools, or public
interfaces
seems less critical as it doesn't change the protocol and can be
done
easily later on. The main question becomes - is the protocol
production-ready and sufficient to replace the classic protocol if
we
go with 2?
Cheers,
Lucas