Hi all,
For your info, I updated the StreamsGroupInitialize request with the
following changes:
1. I added the topology ID to the request so that the group coordinator
knows for which topology it got the initialization.
2. I renamed field "Subtopology" to "SubtopologyId" since the field is
the ID of the subtopology but that was not clear from the name.
Best,
Bruno
On 8/28/24 2:06 PM, Lucas Brutschy wrote:
Hi Sophie,
Thanks for your detailed comments - much appreciated! I think you read
a version of the KIP that did not yet include the admin command-line
tool and the Admin API extensions, so some of the comments are already
addressed in the KIP.
S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the
consumer background thread. Note that in the new consumer threading
model, all RPCs are run by the background thread. Check out this:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
for more information. Both our RPCs are just part of the group
membership management and do not need to be invoked explicitly by the
application thread. You could imagine calling the initialize RPC
explicitly (to implement explicit initialization), but this would
still mean sending an event to the background thread, and the
background thread in turn invokes the RPC. However, explicit
initialization would require some additional public interfaces that we
are not including in this KIP. StreamsGroupDescribe is called by the
AdminClient, and used by the command-line tool
kafka-streams-groups.sh.
S2. I think the max.warmup.replicas=100 suggested by Nick was intended
as the upper limit for setting the group configuration on the broker.
Just to make sure that this was not a misunderstanding. By default,
values above 100 should be rejected when setting a specific value for
group. Are you suggesting 20 or 30 for the default value for groups,
or the default upper limit for the group configuration?
S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The
MemberEpoch=-1 is a left-over from an earlier discussion. It means
that the member is leaving the group, so the intention was that the
member must leave the group when it asks the other members to shut
down. We later reconsidered this and decided that all applications
should just react to the shutdown application signal that is returned
by the broker, so the client first sets the ShutdownApplication and
later leaves the group. Thanks for spotting this, I removed it.
S4. Not sure if this refers to the latest version of the KIP. We added
an extension of the admin API and a kafka-streams-groups.sh
command-line tool to the KIP already.
S5. All RPCs for dealing with offsets will keep working with streams
groups. The extension of the admin API is rather cosmetic, since the
method names use "consumer group". The RPCs, however, are generic and
do not need to be changed.
S6. Yes, you can use the DeleteGroup RPC with any group ID, whether
streams group or not.
S7. See the admin API section.
S8. I guess for both A and B, I am not sure what you are suggesting.
Do you want to make the broker-side topology immutable and not include
any information about the topology, like the topology ID in the RPC?
It would seem that this would be a massive food-gun for people, if
they start changing their topology and don't notice that the broker is
looking at a completely different version of the topology. Or did you
mean that there is some kind of topology ID, so that at least we can
detect inconsistencies between broker and client-side topologies, and
we fence out any member with an incorrect topology ID? Then we seem to
end up with mostly the same RPCs and the same questions (how is the
topology ID generated?). I agree that the latter could be an option.
See summary at the end of this message.
S9. If we flat out refuse topology updates, agree - we cannot let the
application crash on minor changes to the topology. However, if we
allow topology updates as described in the KIP, there are only upsides
to making the topology ID more sensitive. All it will cause is that a
client will have to resend a `StreamsGroupInitialize`, and during the
rolling bounce, older clients will not get tasks assigned.
S10. The intention in the KIP is really just this - old clients can
only retain tasks, but not get new ones. If the topology has
sub-topologies, these will initially be assigned to the new clients,
which also have the new topology. You are right that rolling an
application where the old structure and the new structure are
incompatible (e.g. different subtopologies access the same partition)
will cause problems. But this will also cause problems in the current
protocol, so I'm not sure, if it's strictly a regression, it's just
unsupported (which we can only make the best effort to detect).
In summary, for the topology ID discussion, I mainly see two options:
1) stick with the current KIP proposal
2) define the topology ID as the hash of the StreamsGroupInitialize
topology metadata only, as you suggested, and fence out members with
an incompatible topology ID.
I think doing 2 is also a good option, but in the end it comes down to
how important we believe topology updates (where the set of
sub-topologies or set of internal topics are affected) to be. The main
goal is to make the new protocol a drop-in replacement for the old
protocol, and at least define the RPCs in a way that we won't need
another KIP which bumps the RPC version to make the protocol
production-ready. Adding more command-line tools, or public interfaces
seems less critical as it doesn't change the protocol and can be done
easily later on. The main question becomes - is the protocol
production-ready and sufficient to replace the classic protocol if we
go with 2?
Cheers,
Lucas