I still need to catch up on the discussion in general. But as promised, I just started KIP-1088 about the `KafkaClientSupplier` question.

Looking forward to your feedback on the new KIP. I hope we can get KIP-1088 done soon, to not block this KIP.


-Matthias

On 9/4/24 09:52, Lucas Brutschy wrote:
Hi Sophie,

thanks for the questions and comments!

S1. KIP-1071 does not need to add public interfaces to streams, but it
does not (yet) make the streams protocol client pluggable. Matthias
promised to write a KIP that will propose a solution for the
KafkaClientSupplier soon. He suggested keeping it separate from this
KIP, but we can treat it as a blocker. You are right that we could try
fitting an explicit topic initialization in this interface, if we
decide we absolutely need it now, although I'm not sure if that would
actually define a useful workflow for the users to explicitly update a
topology. Explicit initialization cannot be part of admin tools,
because we do not have the topology there. It could probably be a
config that defaults to off, or a method on the KafkaStreams object?
To me, explicit initialization opens a broad discussion on how to do
it, so I would like to avoid including it in this KIP. I guess it also
depends on the outcome of the discussion in S8-S10.

S2. It's hard to define the perfect default, and we should probably
aim for safe operations first - since people can always up the limit.
I changed it to 20.

S11. I was going to write we can add it, but it's actually quite
independent of KIP-1071 and also a nice newbie ticket, so I would
propose we treat it as out-of-scope and let somebody define it in a
separate KIP - any discussion about it would anyway be lost in this
long discussion thread.

S12. Member epoch behaves the same as in KIP-848. See the KIP-848
explanation for basics:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-MemberEpoch-Reconciliationofthegroup

Together with the section "Reconciliation of the group", this should
make clear how member epochs, group epochs and assignment epochs work
in KIP-1071, but I'll try to summarize the reconciliation a bit more
explicitly below:

Essentially, the broker drives a reconciliation for each member
separately. The goal is to reconcile the assignment generated by the
assignor and reach the assignment epoch, but it's an asynchronous
process, so each member may still be on an older epoch, which is the
member epoch. We do not reinvent any of the internal machinery for
streams here, we are just adding standby tasks and warm-up tasks to
the reconciliation of KIP-848.

During reconciliation, the broker first asks the member to revoke all
tasks that are not in its target assignment, by removing it from the
current assignment for the member (which is provided in the heartbeat
response). Until revocation is completed, the member remains on the
old member epoch and in `UNREVOKED` state. Only once the member
confirms revocation of these tasks by removing them from the set of
owned tasks in the heartbeat request, the member can transition to the
current assignment epoch and get new tasks assigned. Once the member
has revoked all tasks that are not in its target assignment, it
transitions to the new epoch and gets new tasks assigned. There are
some tasks that may be in the (declarative) target assignment of the
member generated by the assignor, but that may not be immediately
added to the current assignment of the member, because they are still
"unreleased". Unreleased tasks are:

- A. For any task, another instance of the same process may own the
task in any role (active/standby/warm-up), and we are still awaiting
revocation.

- B. For an active task, an instance of any process may own the same
active task, and we are still awaiting revocation.

As long as there is at least one unreleased task, the member remains
in state UNRELEASED. Once all tasks of the target assignment are added
to the current assignment of the member (because they were released),
the member transitions to STABLE.

Technically, the first constraint (we cannot assign a task in two
different roles to the same process) is only relevant for stateful
tasks that use the local state directory. I wonder if we should relax
this restriction slightly, by marking sub-topologies that access the
local state directory. This information could also be used by the
assignor. But we can also introduce as a follow-up improvement.

S13i. You are right, these bumps of the group epoch are "on top" of
the KIP-848 reasons for bumping the group epoch. I added the KIP-848
reasons as well, because this was a bit confusing in the KIP.

S13ii. The group epoch is bumped immediately once a warm-up task has
caught up, since this is what triggers running the assignor. However,
the assignment epoch is only bumped once the task assignor has run. If
the task assignment for a member hasn't changed, it will transition to
the new assignment epoch immediately.

S13iii. In the case of a rolling bounce, the group epoch will be
bumped for every member that leaves and joins the group. For updating
rack ID, client tags, topology ID etc. -- this will mostly play a role
for static membership, as you mentioned, but would cause the same
thing - in a rolling bounce with static membership, we will attempt to
recompute the assignment after every bounce, if the topology ID is
changed in the members. This can cause broker load, as you mentioned.
Note, however, that we don't have to compute an assignment for every
group epoch bump. While KIP-848 computes its assignment as part of the
main loop of the group coordinator, it uses asynchronous assignment in
the case of client-side assignment, where the assignment is slower. So
it will kick off the assignment on the client side, enter state
`ASSIGNING`, but may have more group epoch bumps in the meantime while
the assignment is computed on the client side. We leave it open to use
the same mechanism on the broker to compute assignment asynchronously
on a separate thread, in case streams assignment is too slow for the
main loop of the group coordinator. We will also consider
rate-limiting the assignor to reduce broker load. However, it's too
early to define this in detail, as we don't know much about the
performance impact yet.

S13iv. The heartbeat just sends "null" instead of the actual value in
case the value didn't change since the last heartbeat, so no
comparison is required. This is defined in the heartbeat RPC and the
same as in KIP-848.

S13v. I think I disagree. The group epoch represents a version of the
group state, including the current topology and the metadata of all
members -- the whole input to the task assignor -- and defines exactly
the condition under which we attempt to recompute the assignment. So
it's not overloaded, and has a clear definition. It's also exactly the
same as in KIP-848, and we do not want to move away too far from that
protocol, unless it's something that is specific to streams that need
to be changed. Also, the protocol already uses three types of epochs -
member epoch, assignment epoch, group epoch. I don't think adding more
epochs will make things clearer. Unless we have a strong reason why we
need to add extra epochs, I'm not in favor.

S14. Assignment epoch is the same as in KIP-848:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol#KIP848:TheNextGenerationoftheConsumerRebalanceProtocol-AssignmentEpoch-Computethegroupassignment

Its definition does not change at all, so we didn't describe it in
detail in KIP-1071.

S15. I agree that the name is a bit generic, but the time to change
this was when `group.instance.id` was introduced. Also, if we start
diverging from regular consumers / consumer groups in terms of naming,
it will not make things clearer. "application.id" and "group.id" are
bad enough, in my eyes. I hope it's okay if I just improve the field
documentation.

S16. This goes back to KIP-482. Essentially, in flexible request
schema versions you can define optional fields that will be encoded
with a field tag on the wire, but can also be completely omitted.
This is similar to how fields are encoded in, e.g., protobuf. It
improves schema evolution, because we can add new fields to the schema
without bumping the version. There is also a minor difference in
encoding size: when encoded, tagged fields take up more bytes on the
wire because of the field tag, but they can be omitted completely. It
will become interesting when we want to include extra data after the
initial protocol definition.

Cheers,
Lucas

On Tue, Sep 3, 2024 at 10:36 PM Sophie Blee-Goldman
<sop...@responsive.dev> wrote:

Ah, my bad -- I thought I refreshed the page to get the latest version
which is why I was a bit confused when I couldn't find anything about the
new tools which I had previously seen in the KIP. Sorry for the confusion
and unnecessary questions

S1.

You could imagine calling the initialize RPC
explicitly (to implement explicit initialization), but this would
still mean sending an event to the background thread, and the
background thread in turn invokes the RPC. However, explicit
initialization would require some additional public interfaces that we
are not including in this KIP.

I'm confused -- if we do all the group management stuff in a background
thread to avoid needing public APIs, how do we pass all the
Streams-specific and app-specific info to the broker for the
StreamsGroupInitialize? I am guessing this is where we need to invoke
internal APIs and is related to our discussion about removing the
KafkaClientSupplier. However, it seems to me that no matter how you look at
it, Kafka Streams will need to pass information to and from the background
thread if the background thread is to be aware of things like tasks, offset
sums, repartition topics, etc

We don't really need to rehash everything here since it seems like the
proposal to add a StreamsConsumer/StreamsAssignment interface will address
this, which I believe we're in agreement on. I just wanted to point out
that this work to replace the KafkaClientSupplier is clearly intimately
tied up with KIP-1071, and should imho be part of this KIP and not its own
standalone thing.

By the way I'm happy to hop on a call to help move things forward on that
front (or anything really). Although I guess we're just waiting on a design
right now?

S2. I was suggesting something lower for the default upper limit on all
groups. I don't feel too strongly about it, just wanted to point out that
the tradeoff is not about the assignor runtime but rather about resource
consumption, and that too many warmups could put undue load on the cluster.
In the end if we want to trust application operators then I'd say it's fine
to use a higher cluster-wide max like 100.

S8-10 I'll get back to you on this in another followup since I'm still
thinking some things through and want to keep the discussion rolling for
now. In the meantime I have some additional questions:

S11. One of the main issues with unstable assignments today is the fact
that assignors rely on deterministic assignment and use the process Id,
which is not configurable and only persisted via local disk in a
best-effort attempt. It would be a very small change to include this in
KIP-1071 (like 5 LOC), WDYT? (Would even be willing to do the PR for this
myself so as not to add to your load). There's a ticket for it here:
https://issues.apache.org/jira/browse/KAFKA-15190

S12. What exactly is the member epoch and how is it defined? When is it
bumped? I see in the HB request field that certain values signal a member
joining/leaving, but I take it there are valid positive values besides the
0/-1/-2 codes? More on this in the next question:

S13. The KIP says the group epoch is updated in these three cases:
a. Every time the topology is updated through the StreamsGroupInitialize API
b. When a member with an assigned warm-up task reports a task changelog
offset and task changelog end offset whose difference is less that
acceptable.recovery.lag.
c. When a member updates its topology ID, rack ID, client tags or process
ID. Note: Typically, these do not change within the lifetime of a Streams
client, so this only happens when a member with static membership rejoins
with an updated configuration.

S13.i First, just to clarify, these are not the *only* times the group
epoch is bumped but rather the additional cases on top of the regular
consumer group protocol -- so it's still bumped when a new member joins or
leaves, etc, right?
S13.ii Second, does (b) imply the epoch is bumped just whenever the broker
notices a task has finished warming up, or does it first reassign the task
and then bump the group epoch as a result of this task reassignment?
S13.iii Third, (a) mentions the group epoch is bumped on each
StreamsGroupInitialize API but (c) says it gets bumped whenever a member
updates its topology ID. Does this mean that in the event of a rolling
upgrade that changes the topology, the group epoch is bumped just once or
for each member that updates its topology ID? Or does the "updating its
topology ID" somehow have something to do with static membership?
S13.iv How does the broker know when a member has changed its rack ID,
client tags, etc -- does it compare these values on every hb?? That seems
like a lot of broker load. If this is all strictly for static membership
then have we considered leaving static membership support for a followup
KIP? Not necessarily advocating for that, just wondering if this was
thought about already. Imo it would be acceptable to not support static
membership in the first version (especially if we fixed other issues like
making the process Id configurable to get actual stable assignments!)
S13.v I'm finding the concept of group epoch here to be somewhat
overloaded. It sounds like it's trying to keep track of multiple things
within a single version: the group membership, the topology version, the
assignment version, and various member statuses (eg rack ID/client tags).
Personally, I think it would be extremely valuable to unroll all of this
into separate epochs with clear definitions and boundaries and triggers.
For example, I would suggest something like this:

group epoch: describes the group at a point in time, bumped when set of
member ids changes (ie static or dynamic member leaves or joins) --
triggered by group coordinator noticing a change in group membership
topology epoch: describes the topology version, bumped for each successful
StreamsGroupInitialize that changes the broker's topology ID --  triggered
by group coordinator bumping the topology ID (ie StreamsGroupInitialize)
(optional) member epoch: describes the member version, bumped when a
memberId changes its rack ID, client tags, or process ID (maybe topology
ID, not sure) -- is this needed only for static membership? Going back to
question S12, when is the member epoch is bumped in the current proposal?
assignment epoch: describes the assignment version, bumped when the
assignor runs successfully (even if the actual assignment of tasks does not
change) -- can be triggered by tasks completing warmup, a manual
client-side trigger (future KIP only), etc (by definition is bumped any
time any of the other epochs are bumped since they all trigger a
reassignment)

S14. The KIP also mentions a new concept of an  "assignment epoch" but
doesn't seem to ever elaborate on how this is defined and what it's used
for. I assume it's bumped each time the assignment changes and represents a
sort of "assignment version", is that right? Can you describe in more
detail the difference between the group epoch and the assignment epoch?

S15. I assume the "InstanceId" field in the HB request corresponds to the
group.instance.id config of static membership. I always felt this field
name was too generic and easy to get mixed up with other identifiers, WDYT
about "StaticId" or "StaticInstanceId" to make the static membership
relation more explicit?

S16. what is the "flexibleVersions" field in the RPC protocols? I assume
this is related to versioning compatibility but it's not quite clear from
the KIP how this is to be used. For example if we modify the protocol by
adding new fields at the end, we'd bump the version but should the
flexibleVersions field change as well?

On Wed, Aug 28, 2024 at 5:06 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

Hi Sophie,

Thanks for your detailed comments - much appreciated! I think you read
a version of the KIP that did not yet include the admin command-line
tool and the Admin API extensions, so some of the comments are already
addressed in the KIP.

S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the
consumer background thread. Note that in the new consumer threading
model, all RPCs are run by the background thread. Check out this:

https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
for more information. Both our RPCs are just part of the group
membership management and do not need to be invoked explicitly by the
application thread. You could imagine calling the initialize RPC
explicitly (to implement explicit initialization), but this would
still mean sending an event to the background thread, and the
background thread in turn invokes the RPC. However, explicit
initialization would require some additional public interfaces that we
are not including in this KIP. StreamsGroupDescribe is called by the
AdminClient, and used by the command-line tool
kafka-streams-groups.sh.

S2. I think the max.warmup.replicas=100 suggested by Nick was intended
as the upper limit for setting the group configuration on the broker.
Just to make sure that this was not a misunderstanding. By default,
values above 100 should be rejected when setting a specific value for
group. Are you suggesting 20 or 30 for the default value for groups,
or the default upper limit for the group configuration?

S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The
MemberEpoch=-1 is a left-over from an earlier discussion. It means
that the member is leaving the group, so the intention was that the
member must leave the group when it asks the other members to shut
down. We later reconsidered this and decided that all applications
should just react to the shutdown application signal that is returned
by the broker, so the client first sets the ShutdownApplication and
later leaves the group. Thanks for spotting this, I removed it.

S4. Not sure if this refers to the latest version of the KIP. We added
an extension of the admin API and a kafka-streams-groups.sh
command-line tool to the KIP already.

S5. All RPCs for dealing with offsets will keep working with streams
groups. The extension of the admin API is rather cosmetic, since the
method names use "consumer group". The RPCs, however, are generic and
do not need to be changed.

S6. Yes, you can use the DeleteGroup RPC with any group ID, whether
streams group or not.

S7. See the admin API section.

S8. I guess for both A and B, I am not sure what you are suggesting.
Do you want to make the broker-side topology immutable and not include
any information about the topology, like the topology ID in the RPC?
It would seem that this would be a massive food-gun for people, if
they start changing their topology and don't notice that the broker is
looking at a completely different version of the topology. Or did you
mean that there is some kind of topology ID, so that at least we can
detect inconsistencies between broker and client-side topologies, and
we fence out any member with an incorrect topology ID? Then we seem to
end up with mostly the same RPCs and the same questions (how is the
topology ID generated?). I agree that the latter could be an option.
See summary at the end of this message.

S9. If we flat out refuse topology updates, agree - we cannot let the
application crash on minor changes to the topology. However, if we
allow topology updates as described in the KIP, there are only upsides
to making the topology ID more sensitive. All it will cause is that a
client will have to resend a `StreamsGroupInitialize`, and during the
rolling bounce, older clients will not get tasks assigned.

S10. The intention in the KIP is really just this - old clients can
only retain tasks, but not get new ones. If the topology has
sub-topologies, these will initially be assigned to the new clients,
which also have the new topology. You are right that rolling an
application where the old structure and the new structure are
incompatible (e.g. different subtopologies access the same partition)
will cause problems. But this will also cause problems in the current
protocol, so I'm not sure, if it's strictly a regression, it's just
unsupported (which we can only make the best effort to detect).

In summary, for the topology ID discussion, I mainly see two options:
1) stick with the current KIP proposal
2) define the topology ID as the hash of the StreamsGroupInitialize
topology metadata only, as you suggested, and fence out members with
an incompatible topology ID.
I think doing 2 is also a good option, but in the end it comes down to
how important we believe topology updates (where the set of
sub-topologies or set of internal topics are affected) to be. The main
goal is to make the new protocol a drop-in replacement for the old
protocol, and at least define the RPCs in a way that we won't need
another KIP which bumps the RPC version to make the protocol
production-ready. Adding more command-line tools, or public interfaces
seems less critical as it doesn't change the protocol and can be done
easily later on. The main question becomes - is the protocol
production-ready and sufficient to replace the classic protocol if we
go with 2?

Cheers,
Lucas

Reply via email to