Hi Lianet,

Thanks for the review!

Here my answers:

LM1. Is your question whether we need to send a full heartbeat each time the member re-joins the group even if the information in the RPC did not change since the last heartbeat?

LM2. Is the reason for sending the instance ID each time that a member could shutdown, change the instance ID and then start and heartbeat again, but the group coordinator would never notice that the instance ID changed?

LM3. I see your point. I am wondering whether this additional information is worth the dependency between the group types. To return INVALID_GROUP_TYPE, the group coordinator needs to know that a group ID exists with a different group type. With a group coordinator as we have it now in Apache Kafka that manages all group types, that is not a big deal, but imagine if we (or some implementation of the Apache Kafka protocol) decide to have a separate group coordinator for each group type.

LM4. Using INVALID_GROUP_ID if the group ID is empty makes sense to me. I going to change that.

LM5. I think there is a dependency from the StreamsGroupInitialize RPC to the heartbeat. The group must exist when the initialize RPC is received by the group coordinator. The group is created by the heartbeat RPC. I would be in favor of making the initialize RPC independent from the heartbeat RPC. That would allow to initialize a streams group explicitly with an admin tool.

LM6. I think it affects streams and streams should behave as the consumer group.

LM7. Good point that we will consider.

LM8. Fixed! Thanks!


Best,
Bruno




On 7/19/24 9:53 PM, Lianet M. wrote:
Hi Lucas/Bruno, thanks for the great KIP! First comments:

LM1. Related to where the KIP says:  *“Group ID, member ID, member epoch
are sent with each heartbeat request. Any other information that has not
changed since the last heartbeat can be omitted.”. *I expect all the other
info also needs to be sent whenever a full heartbeat is required (even if
it didn’t change from the last heartbeat), ex. on fencing scenarios,
correct?

LM2. For consumer groups we always send the groupInstanceId (if any) as
part of every heartbeat, along with memberId, epoch and groupId. Should we
consider that too here?

LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to
the stream-specific RPCs if the groupId is associated with a group type
that is not streams (ie. consumer group or share group). I wonder if at
this point, where we're getting several new group types added, each with
RPCs that are supposed to include groupId of a certain type, we should be
more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE
(group exists but not with a valid type for this RPC) vs a
GROUP_ID_NOT_FOUND (group does not exist).  Those errors would be
consistently used across consumer, share, and streams RPCs whenever the
group id is not of the expected type.
This is truly not specific to this KIP, and should be addressed with all
group types and their RPCs in mind. I just wanted to bring out my concern
and get thoughts around it.

LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if
groupId is empty. There is already an INVALID_GROUP_ID error, that seems
more specific to this situation. Error handling of specific errors would
definitely be easier than having to deal with a generic INVALID_REQUEST
(and probably its custom message). I know that for KIP-848 we have
INVALID_REQUEST for similar situations, so if ever we take down this path
we should review it there too for consistency. Thoughts?

LM5. The dependency between the StreamsGroupHeartbeat RPC and the
StreamsGroupInitialize RPC is one-way only right? HB requires a previous
StreamsGroupInitialize request, but StreamsGroupInitialize processing is
totally independent of heartbeats (and could perfectly be processed without
a previous HB, even though the client implementation we’re proposing won’t
go down that path). Is my understanding correct? Just to double check,
seems sensible like that at the protocol level.

LM6. With KIP-848, there is an important improvement that brings a
difference in behaviour around the static membership: with the classic
protocol, if a static member joins with a group instance already in use, it
makes the initial member fail with a FENCED_INSTANCED_ID exception, vs.
with the new consumer group protocol, the second member trying to join
fails with an UNRELEASED_INSTANCE_ID. Does this change need to be
considered in any way for the streams app? (I'm not familiar with KS yet,
but thought it was worth asking. If it doesn't affect in any way, still
maybe helpful to call it out on a section for static membership)

LM7. Regarding the admin tool to manage streams groups. We can discuss
whether to have it here or separately, but I think we should aim for some
basic admin capabilities from the start, mainly because I believe it will
be very helpful/needed in practice during the impl of the KIP. From
experience with KIP-848, we felt a bit blindfolded in the initial phase
where we still didn't have kafka-consumer-groups dealing with the new
groups (and then it was very helpful and used when we were able to easily
inspect them from the console)

LM8. nit: the links the KIP-848 are not quite right (pointing to an
unrelated “Future work section” at the end of KIP-848)

Thanks!

Lianet


On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
<lbruts...@confluent.io.invalid> wrote:

Hi Andrew,

AS2: I added a note for now. If others feel strongly about it, we can
still add more administrative tools to the KIP - it should not change
the overall story significantly.

AS8: "streams.group.assignor.name" sounds good to me to distinguish
the config from class names. Not sure if I like the "default". To be
consistent, we'd then have to call it
`group.streams.default.session.timeout.ms` as well. I only added the
`.name` on both broker and group level for now.

AS10: Ah, I misread your comment, now I know what you meant. Good
point, fixed (by Bruno).

Cheers,
Lucas

On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
<andrew_schofi...@live.com> wrote:

Hi Lucas,
I see that I hit send too quickly. One more comment:

AS2: I think stating that there will be a `kafka-streams-group.sh` in a
future KIP is fine to keep this KIP focused. Personally, I would probably
put all of the gory details in this KIP, but then it’s not my KIP. A
future
pointer is fine too.

Thanks,
Andrew


On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io.INVALID>
wrote:

Hi Andrew,

thanks for getting the discussion going! Here are my responses.

AS1: Good point, done.

AS2: We were planning to add more administrative tools to the
interface in a follow-up KIP, to not make this KIP too large. If
people think that it would help to understand the overall picture if
we already add something like `kafka-streams-groups.sh`, we will do
that. I also agree that we should address how this relates to
KIP-1043, we'll add it.

AS3: Good idea, that's more consistent with `assigning` and
`reconciling` etc.

AS4: Thanks, Fixed.

AS5: Good catch. This was supposed to mean that we require CREATE on
cluster or CREATE on all topics, not both. Fixed.

AS6: Thanks, Fixed.

AS7. Thanks, Fixed.

AS8: I think this works a bit different in this KIP than in consumer
groups. KIP-848 lets the members vote for a preferred assignor, and
the broker-side assignor is picked by majority vote. The
`group.consumer.assignors` specifies the list of assignors that are
supported on the broker, and is configurable because the interface is
pluggable. In this KIP, the task assignor is not voted on by members
but configured on the broker-side. `group.streams.assignor` is used
for this, and uses a specific name. If we'll make the task assignor
pluggable on the broker-side, we'd introduce a separate config
`group.streams.assignors`, which would indeed be a list of class
names. I think there is no conflict here, the two configurations serve
different purposes.  The only gripe I'd have here is naming as
`group.streams.assignor` and `group.streams.assignors` would be a bit
similar, but I cannot really think of a better name for
`group.streams.assignor`, so I'd probably rather introduce
`group.streams.assignors`  as `group.streams.possible_assignors`  or
something like that.

AS9: I added explanations for the various record types. Apart from the
new topology record, and the partition metadata (which is based on the
topology and can only be created once we have a topology initialized)
the lifecycle for the records is basically identical as in KIP-848.

AS10: In the consumer offset topic, the version in the key is used to
differentiate different schema types with the same content. So the
keys are not versioned, but the version field is "abused" as a type
tag. This is the same in KIP-848, we followed it for consistency.

Cheers,
Lucas


On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
<andrew_schofi...@live.com> wrote:

Hi Lucas and Bruno,

Thanks for the great KIP.

I've read through the document and have some initial comments.

AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS
enumeration
constant. This is a change to the public interface and should be
called out.

AS2: Since streams groups are no longer consumer groups, how does the
user
manipulate them, observe lag and so on? Will you add
`kafka-streams-groups.sh`
or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043
can easily
be extended to support streams groups, but that only lets the user
see the
groups, not manipulate them.

AS3: I wonder whether the streams group state of UNINITIALIZED would
be
better expressed as INITIALIZING.

AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex
should
be nullable.

AS5: Why does StreamsGroupInitialize require CREATE permission on the
cluster resource? I imagine that this is one of the ways that the
request might
be granted permission to create the StateChangelogTopics and
RepartitionSourceTopics, but if it is granted permission to create
those topics
with specific ACLs, would CREATE on the cluster resource still be
required?

AS6: StreamsGroupInitialize can also fail with
TOPIC_AUTHORIZATION_FAILED
and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.

AS7: A tiny nit. You've used TopologyID (capitals) in
StreamsGroupHeartbeatRequest
and a few others, but in all other cases the fields which are ids are
spelled Id.
I suggest TopologyId.

Also, "interal" is probably meant to be "interval”.

AS8: For consumer groups, the `group.consumer.assignors`
configuration is a
list of class names. The assignors do have names too, but the
configuration which
enables them is in terms of class names. I wonder whether the broker’s
group.streams.assignor could actually be `group.streams.assignors`
and specified
as a list of the class names of the supplied assignors. I know you're
not supporting
other assignors yet, but when you do, I expect you would prefer to
have used class
names from the start.

The use of assignor names in the other places looks good to me.

AS9: I'd find it really helpful to have a bit of a description about
the purpose and
lifecycle of the 9 record types you've introduced on the
__consumer_offsets topic.
I did a cursory review but without really understanding what's
written when,
I can't do a thorough job of reviewing.

AS10: In the definitions of the record keys, such as
StreamsGroupCurrentMemberAssignmentKey, the versions of the fields
must
match the versions of the types.

Thanks,
Andrew

On 12 Jul 2024, at 09:04, Lucas Brutschy <lbruts...@confluent.io.INVALID>
wrote:

Hi all,

I would like to start a discussion thread on KIP-1071: Streams
Rebalance Protocol. With this KIP, we aim to bring the principles
laid
down by KIP-848 to Kafka Streams, to make rebalances more reliable
and
scalable, and make Kafka Streams overall easier to deploy and
operate.
The KIP proposed moving the assignment logic to the broker, and
introducing a dedicated group type and dedicated RPCs for Kafka
Streams.

The KIP is here:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol

This is joint work with Bruno Cadonna.

Please take a look and let us know what you think.

Best,
Lucas




Reply via email to