Hi Bruno,
Thanks for adding the detail on the schemas on records written to 
__consumer_offsets.
I’ve reviewed them in detail and they look good to me. I have one naive 
question.

AS11: I notice that an assignment is essentially a set of partition indices for
subtopologies. Since a subtopology can be defined by a source topic regex, does
this mean that an assignment gives the same set of partition indices for all 
topics
which happen to match the regex? So, a subtopology reading from A* that matches
AB and AC would give the same set of partitions to each task for both topics, 
and
is not able to give AB:0 to one task and AC:0 to a different task. Is this 
correct?

Thanks,
Andrew

> On 23 Jul 2024, at 16:16, Bruno Cadonna <cado...@apache.org> wrote:
>
> Hi Lianet,
>
> Thanks for the review!
>
> Here my answers:
>
> LM1. Is your question whether we need to send a full heartbeat each time the 
> member re-joins the group even if the information in the RPC did not change 
> since the last heartbeat?
>
> LM2. Is the reason for sending the instance ID each time that a member could 
> shutdown, change the instance ID and then start and heartbeat again, but the 
> group coordinator would never notice that the instance ID changed?
>
> LM3. I see your point. I am wondering whether this additional information is 
> worth the dependency between the group types. To return INVALID_GROUP_TYPE, 
> the group coordinator needs to know that a group ID exists with a different 
> group type. With a group coordinator as we have it now in Apache Kafka that 
> manages all group types, that is not a big deal, but imagine if we (or some 
> implementation of the Apache Kafka protocol) decide to have a separate group 
> coordinator for each group type.
>
> LM4. Using INVALID_GROUP_ID if the group ID is empty makes sense to me. I 
> going to change that.
>
> LM5. I think there is a dependency from the StreamsGroupInitialize RPC to the 
> heartbeat. The group must exist when the initialize RPC is received by the 
> group coordinator. The group is created by the heartbeat RPC. I would be in 
> favor of making the initialize RPC independent from the heartbeat RPC. That 
> would allow to initialize a streams group explicitly with an admin tool.
>
> LM6. I think it affects streams and streams should behave as the consumer 
> group.
>
> LM7. Good point that we will consider.
>
> LM8. Fixed! Thanks!
>
>
> Best,
> Bruno
>
>
>
>
> On 7/19/24 9:53 PM, Lianet M. wrote:
>> Hi Lucas/Bruno, thanks for the great KIP! First comments:
>> LM1. Related to where the KIP says:  *“Group ID, member ID, member epoch
>> are sent with each heartbeat request. Any other information that has not
>> changed since the last heartbeat can be omitted.”. *I expect all the other
>> info also needs to be sent whenever a full heartbeat is required (even if
>> it didn’t change from the last heartbeat), ex. on fencing scenarios,
>> correct?
>> LM2. For consumer groups we always send the groupInstanceId (if any) as
>> part of every heartbeat, along with memberId, epoch and groupId. Should we
>> consider that too here?
>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to
>> the stream-specific RPCs if the groupId is associated with a group type
>> that is not streams (ie. consumer group or share group). I wonder if at
>> this point, where we're getting several new group types added, each with
>> RPCs that are supposed to include groupId of a certain type, we should be
>> more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE
>> (group exists but not with a valid type for this RPC) vs a
>> GROUP_ID_NOT_FOUND (group does not exist).  Those errors would be
>> consistently used across consumer, share, and streams RPCs whenever the
>> group id is not of the expected type.
>> This is truly not specific to this KIP, and should be addressed with all
>> group types and their RPCs in mind. I just wanted to bring out my concern
>> and get thoughts around it.
>> LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if
>> groupId is empty. There is already an INVALID_GROUP_ID error, that seems
>> more specific to this situation. Error handling of specific errors would
>> definitely be easier than having to deal with a generic INVALID_REQUEST
>> (and probably its custom message). I know that for KIP-848 we have
>> INVALID_REQUEST for similar situations, so if ever we take down this path
>> we should review it there too for consistency. Thoughts?
>> LM5. The dependency between the StreamsGroupHeartbeat RPC and the
>> StreamsGroupInitialize RPC is one-way only right? HB requires a previous
>> StreamsGroupInitialize request, but StreamsGroupInitialize processing is
>> totally independent of heartbeats (and could perfectly be processed without
>> a previous HB, even though the client implementation we’re proposing won’t
>> go down that path). Is my understanding correct? Just to double check,
>> seems sensible like that at the protocol level.
>> LM6. With KIP-848, there is an important improvement that brings a
>> difference in behaviour around the static membership: with the classic
>> protocol, if a static member joins with a group instance already in use, it
>> makes the initial member fail with a FENCED_INSTANCED_ID exception, vs.
>> with the new consumer group protocol, the second member trying to join
>> fails with an UNRELEASED_INSTANCE_ID. Does this change need to be
>> considered in any way for the streams app? (I'm not familiar with KS yet,
>> but thought it was worth asking. If it doesn't affect in any way, still
>> maybe helpful to call it out on a section for static membership)
>> LM7. Regarding the admin tool to manage streams groups. We can discuss
>> whether to have it here or separately, but I think we should aim for some
>> basic admin capabilities from the start, mainly because I believe it will
>> be very helpful/needed in practice during the impl of the KIP. From
>> experience with KIP-848, we felt a bit blindfolded in the initial phase
>> where we still didn't have kafka-consumer-groups dealing with the new
>> groups (and then it was very helpful and used when we were able to easily
>> inspect them from the console)
>> LM8. nit: the links the KIP-848 are not quite right (pointing to an
>> unrelated “Future work section” at the end of KIP-848)
>> Thanks!
>> Lianet
>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy
>> <lbruts...@confluent.io.invalid> wrote:
>>> Hi Andrew,
>>>
>>> AS2: I added a note for now. If others feel strongly about it, we can
>>> still add more administrative tools to the KIP - it should not change
>>> the overall story significantly.
>>>
>>> AS8: "streams.group.assignor.name" sounds good to me to distinguish
>>> the config from class names. Not sure if I like the "default". To be
>>> consistent, we'd then have to call it
>>> `group.streams.default.session.timeout.ms` as well. I only added the
>>> `.name` on both broker and group level for now.
>>>
>>> AS10: Ah, I misread your comment, now I know what you meant. Good
>>> point, fixed (by Bruno).
>>>
>>> Cheers,
>>> Lucas
>>>
>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield
>>> <andrew_schofi...@live.com> wrote:
>>>>
>>>> Hi Lucas,
>>>> I see that I hit send too quickly. One more comment:
>>>>
>>>> AS2: I think stating that there will be a `kafka-streams-group.sh` in a
>>>> future KIP is fine to keep this KIP focused. Personally, I would probably
>>>> put all of the gory details in this KIP, but then it’s not my KIP. A
>>> future
>>>> pointer is fine too.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>
>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io.INVALID>
>>> wrote:
>>>>>
>>>>> Hi Andrew,
>>>>>
>>>>> thanks for getting the discussion going! Here are my responses.
>>>>>
>>>>> AS1: Good point, done.
>>>>>
>>>>> AS2: We were planning to add more administrative tools to the
>>>>> interface in a follow-up KIP, to not make this KIP too large. If
>>>>> people think that it would help to understand the overall picture if
>>>>> we already add something like `kafka-streams-groups.sh`, we will do
>>>>> that. I also agree that we should address how this relates to
>>>>> KIP-1043, we'll add it.
>>>>>
>>>>> AS3: Good idea, that's more consistent with `assigning` and
>>> `reconciling` etc.
>>>>>
>>>>> AS4: Thanks, Fixed.
>>>>>
>>>>> AS5: Good catch. This was supposed to mean that we require CREATE on
>>>>> cluster or CREATE on all topics, not both. Fixed.
>>>>>
>>>>> AS6: Thanks, Fixed.
>>>>>
>>>>> AS7. Thanks, Fixed.
>>>>>
>>>>> AS8: I think this works a bit different in this KIP than in consumer
>>>>> groups. KIP-848 lets the members vote for a preferred assignor, and
>>>>> the broker-side assignor is picked by majority vote. The
>>>>> `group.consumer.assignors` specifies the list of assignors that are
>>>>> supported on the broker, and is configurable because the interface is
>>>>> pluggable. In this KIP, the task assignor is not voted on by members
>>>>> but configured on the broker-side. `group.streams.assignor` is used
>>>>> for this, and uses a specific name. If we'll make the task assignor
>>>>> pluggable on the broker-side, we'd introduce a separate config
>>>>> `group.streams.assignors`, which would indeed be a list of class
>>>>> names. I think there is no conflict here, the two configurations serve
>>>>> different purposes.  The only gripe I'd have here is naming as
>>>>> `group.streams.assignor` and `group.streams.assignors` would be a bit
>>>>> similar, but I cannot really think of a better name for
>>>>> `group.streams.assignor`, so I'd probably rather introduce
>>>>> `group.streams.assignors`  as `group.streams.possible_assignors`  or
>>>>> something like that.
>>>>>
>>>>> AS9: I added explanations for the various record types. Apart from the
>>>>> new topology record, and the partition metadata (which is based on the
>>>>> topology and can only be created once we have a topology initialized)
>>>>> the lifecycle for the records is basically identical as in KIP-848.
>>>>>
>>>>> AS10: In the consumer offset topic, the version in the key is used to
>>>>> differentiate different schema types with the same content. So the
>>>>> keys are not versioned, but the version field is "abused" as a type
>>>>> tag. This is the same in KIP-848, we followed it for consistency.
>>>>>
>>>>> Cheers,
>>>>> Lucas
>>>>>
>>>>>
>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield
>>>>> <andrew_schofi...@live.com> wrote:
>>>>>>
>>>>>> Hi Lucas and Bruno,
>>>>>>
>>>>>> Thanks for the great KIP.
>>>>>>
>>>>>> I've read through the document and have some initial comments.
>>>>>>
>>>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS
>>> enumeration
>>>>>> constant. This is a change to the public interface and should be
>>> called out.
>>>>>>
>>>>>> AS2: Since streams groups are no longer consumer groups, how does the
>>> user
>>>>>> manipulate them, observe lag and so on? Will you add
>>> `kafka-streams-groups.sh`
>>>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043
>>> can easily
>>>>>> be extended to support streams groups, but that only lets the user
>>> see the
>>>>>> groups, not manipulate them.
>>>>>>
>>>>>> AS3: I wonder whether the streams group state of UNINITIALIZED would
>>> be
>>>>>> better expressed as INITIALIZING.
>>>>>>
>>>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex
>>> should
>>>>>> be nullable.
>>>>>>
>>>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on the
>>>>>> cluster resource? I imagine that this is one of the ways that the
>>> request might
>>>>>> be granted permission to create the StateChangelogTopics and
>>>>>> RepartitionSourceTopics, but if it is granted permission to create
>>> those topics
>>>>>> with specific ACLs, would CREATE on the cluster resource still be
>>> required?
>>>>>>
>>>>>> AS6: StreamsGroupInitialize can also fail with
>>> TOPIC_AUTHORIZATION_FAILED
>>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED.
>>>>>>
>>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in
>>> StreamsGroupHeartbeatRequest
>>>>>> and a few others, but in all other cases the fields which are ids are
>>> spelled Id.
>>>>>> I suggest TopologyId.
>>>>>>
>>>>>> Also, "interal" is probably meant to be "interval”.
>>>>>>
>>>>>> AS8: For consumer groups, the `group.consumer.assignors`
>>> configuration is a
>>>>>> list of class names. The assignors do have names too, but the
>>> configuration which
>>>>>> enables them is in terms of class names. I wonder whether the broker’s
>>>>>> group.streams.assignor could actually be `group.streams.assignors`
>>> and specified
>>>>>> as a list of the class names of the supplied assignors. I know you're
>>> not supporting
>>>>>> other assignors yet, but when you do, I expect you would prefer to
>>> have used class
>>>>>> names from the start.
>>>>>>
>>>>>> The use of assignor names in the other places looks good to me.
>>>>>>
>>>>>> AS9: I'd find it really helpful to have a bit of a description about
>>> the purpose and
>>>>>> lifecycle of the 9 record types you've introduced on the
>>> __consumer_offsets topic.
>>>>>> I did a cursory review but without really understanding what's
>>> written when,
>>>>>> I can't do a thorough job of reviewing.
>>>>>>
>>>>>> AS10: In the definitions of the record keys, such as
>>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields
>>> must
>>>>>> match the versions of the types.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy 
>>>>>>> <lbruts...@confluent.io.INVALID>
>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I would like to start a discussion thread on KIP-1071: Streams
>>>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles
>>> laid
>>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable
>>> and
>>>>>>> scalable, and make Kafka Streams overall easier to deploy and
>>> operate.
>>>>>>> The KIP proposed moving the assignment logic to the broker, and
>>>>>>> introducing a dedicated group type and dedicated RPCs for Kafka
>>>>>>> Streams.
>>>>>>>
>>>>>>> The KIP is here:
>>>>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol
>>>>>>>
>>>>>>> This is joint work with Bruno Cadonna.
>>>>>>>
>>>>>>> Please take a look and let us know what you think.
>>>>>>>
>>>>>>> Best,
>>>>>>> Lucas
>>>>>>
>>>>
>>>

Reply via email to