Hi Bruno, Thanks for adding the detail on the schemas on records written to __consumer_offsets. I’ve reviewed them in detail and they look good to me. I have one naive question.
AS11: I notice that an assignment is essentially a set of partition indices for subtopologies. Since a subtopology can be defined by a source topic regex, does this mean that an assignment gives the same set of partition indices for all topics which happen to match the regex? So, a subtopology reading from A* that matches AB and AC would give the same set of partitions to each task for both topics, and is not able to give AB:0 to one task and AC:0 to a different task. Is this correct? Thanks, Andrew > On 23 Jul 2024, at 16:16, Bruno Cadonna <cado...@apache.org> wrote: > > Hi Lianet, > > Thanks for the review! > > Here my answers: > > LM1. Is your question whether we need to send a full heartbeat each time the > member re-joins the group even if the information in the RPC did not change > since the last heartbeat? > > LM2. Is the reason for sending the instance ID each time that a member could > shutdown, change the instance ID and then start and heartbeat again, but the > group coordinator would never notice that the instance ID changed? > > LM3. I see your point. I am wondering whether this additional information is > worth the dependency between the group types. To return INVALID_GROUP_TYPE, > the group coordinator needs to know that a group ID exists with a different > group type. With a group coordinator as we have it now in Apache Kafka that > manages all group types, that is not a big deal, but imagine if we (or some > implementation of the Apache Kafka protocol) decide to have a separate group > coordinator for each group type. > > LM4. Using INVALID_GROUP_ID if the group ID is empty makes sense to me. I > going to change that. > > LM5. I think there is a dependency from the StreamsGroupInitialize RPC to the > heartbeat. The group must exist when the initialize RPC is received by the > group coordinator. The group is created by the heartbeat RPC. I would be in > favor of making the initialize RPC independent from the heartbeat RPC. That > would allow to initialize a streams group explicitly with an admin tool. > > LM6. I think it affects streams and streams should behave as the consumer > group. > > LM7. Good point that we will consider. > > LM8. Fixed! Thanks! > > > Best, > Bruno > > > > > On 7/19/24 9:53 PM, Lianet M. wrote: >> Hi Lucas/Bruno, thanks for the great KIP! First comments: >> LM1. Related to where the KIP says: *“Group ID, member ID, member epoch >> are sent with each heartbeat request. Any other information that has not >> changed since the last heartbeat can be omitted.”. *I expect all the other >> info also needs to be sent whenever a full heartbeat is required (even if >> it didn’t change from the last heartbeat), ex. on fencing scenarios, >> correct? >> LM2. For consumer groups we always send the groupInstanceId (if any) as >> part of every heartbeat, along with memberId, epoch and groupId. Should we >> consider that too here? >> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND error in response to >> the stream-specific RPCs if the groupId is associated with a group type >> that is not streams (ie. consumer group or share group). I wonder if at >> this point, where we're getting several new group types added, each with >> RPCs that are supposed to include groupId of a certain type, we should be >> more explicit about this situation. Maybe a kind of INVALID_GROUP_TYPE >> (group exists but not with a valid type for this RPC) vs a >> GROUP_ID_NOT_FOUND (group does not exist). Those errors would be >> consistently used across consumer, share, and streams RPCs whenever the >> group id is not of the expected type. >> This is truly not specific to this KIP, and should be addressed with all >> group types and their RPCs in mind. I just wanted to bring out my concern >> and get thoughts around it. >> LM4. On a related note, StreamsGroupDescribe returns INVALID_REQUEST if >> groupId is empty. There is already an INVALID_GROUP_ID error, that seems >> more specific to this situation. Error handling of specific errors would >> definitely be easier than having to deal with a generic INVALID_REQUEST >> (and probably its custom message). I know that for KIP-848 we have >> INVALID_REQUEST for similar situations, so if ever we take down this path >> we should review it there too for consistency. Thoughts? >> LM5. The dependency between the StreamsGroupHeartbeat RPC and the >> StreamsGroupInitialize RPC is one-way only right? HB requires a previous >> StreamsGroupInitialize request, but StreamsGroupInitialize processing is >> totally independent of heartbeats (and could perfectly be processed without >> a previous HB, even though the client implementation we’re proposing won’t >> go down that path). Is my understanding correct? Just to double check, >> seems sensible like that at the protocol level. >> LM6. With KIP-848, there is an important improvement that brings a >> difference in behaviour around the static membership: with the classic >> protocol, if a static member joins with a group instance already in use, it >> makes the initial member fail with a FENCED_INSTANCED_ID exception, vs. >> with the new consumer group protocol, the second member trying to join >> fails with an UNRELEASED_INSTANCE_ID. Does this change need to be >> considered in any way for the streams app? (I'm not familiar with KS yet, >> but thought it was worth asking. If it doesn't affect in any way, still >> maybe helpful to call it out on a section for static membership) >> LM7. Regarding the admin tool to manage streams groups. We can discuss >> whether to have it here or separately, but I think we should aim for some >> basic admin capabilities from the start, mainly because I believe it will >> be very helpful/needed in practice during the impl of the KIP. From >> experience with KIP-848, we felt a bit blindfolded in the initial phase >> where we still didn't have kafka-consumer-groups dealing with the new >> groups (and then it was very helpful and used when we were able to easily >> inspect them from the console) >> LM8. nit: the links the KIP-848 are not quite right (pointing to an >> unrelated “Future work section” at the end of KIP-848) >> Thanks! >> Lianet >> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy >> <lbruts...@confluent.io.invalid> wrote: >>> Hi Andrew, >>> >>> AS2: I added a note for now. If others feel strongly about it, we can >>> still add more administrative tools to the KIP - it should not change >>> the overall story significantly. >>> >>> AS8: "streams.group.assignor.name" sounds good to me to distinguish >>> the config from class names. Not sure if I like the "default". To be >>> consistent, we'd then have to call it >>> `group.streams.default.session.timeout.ms` as well. I only added the >>> `.name` on both broker and group level for now. >>> >>> AS10: Ah, I misread your comment, now I know what you meant. Good >>> point, fixed (by Bruno). >>> >>> Cheers, >>> Lucas >>> >>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield >>> <andrew_schofi...@live.com> wrote: >>>> >>>> Hi Lucas, >>>> I see that I hit send too quickly. One more comment: >>>> >>>> AS2: I think stating that there will be a `kafka-streams-group.sh` in a >>>> future KIP is fine to keep this KIP focused. Personally, I would probably >>>> put all of the gory details in this KIP, but then it’s not my KIP. A >>> future >>>> pointer is fine too. >>>> >>>> Thanks, >>>> Andrew >>>> >>>> >>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy <lbruts...@confluent.io.INVALID> >>> wrote: >>>>> >>>>> Hi Andrew, >>>>> >>>>> thanks for getting the discussion going! Here are my responses. >>>>> >>>>> AS1: Good point, done. >>>>> >>>>> AS2: We were planning to add more administrative tools to the >>>>> interface in a follow-up KIP, to not make this KIP too large. If >>>>> people think that it would help to understand the overall picture if >>>>> we already add something like `kafka-streams-groups.sh`, we will do >>>>> that. I also agree that we should address how this relates to >>>>> KIP-1043, we'll add it. >>>>> >>>>> AS3: Good idea, that's more consistent with `assigning` and >>> `reconciling` etc. >>>>> >>>>> AS4: Thanks, Fixed. >>>>> >>>>> AS5: Good catch. This was supposed to mean that we require CREATE on >>>>> cluster or CREATE on all topics, not both. Fixed. >>>>> >>>>> AS6: Thanks, Fixed. >>>>> >>>>> AS7. Thanks, Fixed. >>>>> >>>>> AS8: I think this works a bit different in this KIP than in consumer >>>>> groups. KIP-848 lets the members vote for a preferred assignor, and >>>>> the broker-side assignor is picked by majority vote. The >>>>> `group.consumer.assignors` specifies the list of assignors that are >>>>> supported on the broker, and is configurable because the interface is >>>>> pluggable. In this KIP, the task assignor is not voted on by members >>>>> but configured on the broker-side. `group.streams.assignor` is used >>>>> for this, and uses a specific name. If we'll make the task assignor >>>>> pluggable on the broker-side, we'd introduce a separate config >>>>> `group.streams.assignors`, which would indeed be a list of class >>>>> names. I think there is no conflict here, the two configurations serve >>>>> different purposes. The only gripe I'd have here is naming as >>>>> `group.streams.assignor` and `group.streams.assignors` would be a bit >>>>> similar, but I cannot really think of a better name for >>>>> `group.streams.assignor`, so I'd probably rather introduce >>>>> `group.streams.assignors` as `group.streams.possible_assignors` or >>>>> something like that. >>>>> >>>>> AS9: I added explanations for the various record types. Apart from the >>>>> new topology record, and the partition metadata (which is based on the >>>>> topology and can only be created once we have a topology initialized) >>>>> the lifecycle for the records is basically identical as in KIP-848. >>>>> >>>>> AS10: In the consumer offset topic, the version in the key is used to >>>>> differentiate different schema types with the same content. So the >>>>> keys are not versioned, but the version field is "abused" as a type >>>>> tag. This is the same in KIP-848, we followed it for consistency. >>>>> >>>>> Cheers, >>>>> Lucas >>>>> >>>>> >>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield >>>>> <andrew_schofi...@live.com> wrote: >>>>>> >>>>>> Hi Lucas and Bruno, >>>>>> >>>>>> Thanks for the great KIP. >>>>>> >>>>>> I've read through the document and have some initial comments. >>>>>> >>>>>> AS1: I suppose that there is a new o.a.k.common.GroupType.STREAMS >>> enumeration >>>>>> constant. This is a change to the public interface and should be >>> called out. >>>>>> >>>>>> AS2: Since streams groups are no longer consumer groups, how does the >>> user >>>>>> manipulate them, observe lag and so on? Will you add >>> `kafka-streams-groups.sh` >>>>>> or extend `kafka-streams-application-reset.sh`? Of course, KIP-1043 >>> can easily >>>>>> be extended to support streams groups, but that only lets the user >>> see the >>>>>> groups, not manipulate them. >>>>>> >>>>>> AS3: I wonder whether the streams group state of UNINITIALIZED would >>> be >>>>>> better expressed as INITIALIZING. >>>>>> >>>>>> AS4: In StreamsGroupInitializeRequest, Topology[].SourceTopicRegex >>> should >>>>>> be nullable. >>>>>> >>>>>> AS5: Why does StreamsGroupInitialize require CREATE permission on the >>>>>> cluster resource? I imagine that this is one of the ways that the >>> request might >>>>>> be granted permission to create the StateChangelogTopics and >>>>>> RepartitionSourceTopics, but if it is granted permission to create >>> those topics >>>>>> with specific ACLs, would CREATE on the cluster resource still be >>> required? >>>>>> >>>>>> AS6: StreamsGroupInitialize can also fail with >>> TOPIC_AUTHORIZATION_FAILED >>>>>> and (subject to AS5) CLUSTER_AUTHORIZATION_FAILED. >>>>>> >>>>>> AS7: A tiny nit. You've used TopologyID (capitals) in >>> StreamsGroupHeartbeatRequest >>>>>> and a few others, but in all other cases the fields which are ids are >>> spelled Id. >>>>>> I suggest TopologyId. >>>>>> >>>>>> Also, "interal" is probably meant to be "interval”. >>>>>> >>>>>> AS8: For consumer groups, the `group.consumer.assignors` >>> configuration is a >>>>>> list of class names. The assignors do have names too, but the >>> configuration which >>>>>> enables them is in terms of class names. I wonder whether the broker’s >>>>>> group.streams.assignor could actually be `group.streams.assignors` >>> and specified >>>>>> as a list of the class names of the supplied assignors. I know you're >>> not supporting >>>>>> other assignors yet, but when you do, I expect you would prefer to >>> have used class >>>>>> names from the start. >>>>>> >>>>>> The use of assignor names in the other places looks good to me. >>>>>> >>>>>> AS9: I'd find it really helpful to have a bit of a description about >>> the purpose and >>>>>> lifecycle of the 9 record types you've introduced on the >>> __consumer_offsets topic. >>>>>> I did a cursory review but without really understanding what's >>> written when, >>>>>> I can't do a thorough job of reviewing. >>>>>> >>>>>> AS10: In the definitions of the record keys, such as >>>>>> StreamsGroupCurrentMemberAssignmentKey, the versions of the fields >>> must >>>>>> match the versions of the types. >>>>>> >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy >>>>>>> <lbruts...@confluent.io.INVALID> >>> wrote: >>>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I would like to start a discussion thread on KIP-1071: Streams >>>>>>> Rebalance Protocol. With this KIP, we aim to bring the principles >>> laid >>>>>>> down by KIP-848 to Kafka Streams, to make rebalances more reliable >>> and >>>>>>> scalable, and make Kafka Streams overall easier to deploy and >>> operate. >>>>>>> The KIP proposed moving the assignment logic to the broker, and >>>>>>> introducing a dedicated group type and dedicated RPCs for Kafka >>>>>>> Streams. >>>>>>> >>>>>>> The KIP is here: >>>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol >>>>>>> >>>>>>> This is joint work with Bruno Cadonna. >>>>>>> >>>>>>> Please take a look and let us know what you think. >>>>>>> >>>>>>> Best, >>>>>>> Lucas >>>>>> >>>> >>>