Hey guys -- sorry I'm late to the party, I'm still going over some things and don't have everything I want to say ready just yet, but I figured that shouldn't stop me from starting with the questions/comments that are ready to go. So here's my first set of feedback:
S1. Can you clarify which client/thread is intended to use each of the RPCs? Some are obvious like the HB request/response, but others seem like they could be the main consumer thread or the background hb thread or an Admin client, or maybe even both. For example who sends out the StreamsGroupInitialize? And the StreamsGroupDescribe? S2. While I’m definitely in favor of increasing the max.warmup.replicas beyond 2 (and would probably go beyond 4 even), 10,000 or even 100 feels way too big. The problem isn’t so much that it “slows down the “assignment”, rather it's a tradeoff between warming up speed and resource consumption. The total resource consumption is obviously the same no matter how many warmups you can have at a time, but the maximum point-in-time resource consumption needs to be considered. If you have 10,000 warmup replicas at a time, on top of your usual standby and active tasks, you’ll finish warming up everything faster but will see much higher broker traffic and potentially get your app and/or cluster throttled. It also puts strain on the application nodes -- if you have 100 warmup tasks per thread (and potentially multiple threads per app), you are very likely to exceed the memory and/or disk available to that node if too many warmups are assigned. Which can lead to a cycle of crashing and rebalancing which can put even more load on the brokers. So I'd be a bit more conservative and go with maybe 20 or 30. Tbh very few apps will even have that many warmup tasks to be assigned at a time anyways so it's probably not going to matter, but best to avoid a crazy app with hundreds of tasks trying to put them all on one node after a restart or something. S3. The StreamsHeartbeatRequest has this field and description: > “ShutdownApplication”: Whether all Streams clients in the group should > shut down. Can only be defined if memberEpoch = -1 When and for what is this used? Is it supposed to be a new way to implement the SHUTDOWN_APPLICATION response from the StreamsExceptionHandler, and if so, why can the field only be defined “if memberEpoch = -1”? S4. It seems like it's been decided that no new tools will be included in this KIP and we will just update the streams-application-reset tool to work with the new Streams group type. However, quite a few new tools and AdminClient RPCs have been proposed throughout the discussion. Even though we ended up scrapping everything, I think at a minimum we should list the possible new tools that can be added in future KIPS as well as the AdminClient RPCs that will no longer work and will need to eventually get a new Streams group version. For example, in a previous email Lucas had mentioned adding "extensions of the Admin API for listing, deleting, describing groups and listing, altering and deleting offsets for streams groups." Similarly we had discussed during the last Streams sync including a new tool to list all Streams groups along with some info such as the client version, topology Id, etc. Even if we don't implement any of these right away, we should mention them in the KIP so that it's clear what tooling is and is not going to be available at the start. S5. On that note, I was wondering how we intend to update the streams-application-reset tool to work with Streams groups if there isn't any new RPC for "altering and deleting offsets". S6. Since you can have an empty Streams group, I was wondering if the application-reset tool would actually delete the group or just reset offsets and leave the empty group behind? I think it would be useful to at least have the option to manually delete the group, in case you wanted to deploy a new app with the same group id. S7. The KIP still includes a new StreamsGroupDescribe RPC but contains very little additional information or context on how, where, and when this RPC will be used. Per my S1 question about who is sending this RPC, if it's for the AdminClient then don't we need an Admin API? Imo even if it's meant to be used by the Streams client and/or hb thread, if we're adding this RPC anyways then it feels like a small extra step to just package it up with an Admin API and possibly even a small cli tool. But I won't insist on adding anything to an already quite large KIP (though please at least clarify the purpose of this RPC in the doc) S8. I see that Guozhang brought this up here already, but I'd like to continue the discussion on the topology ID and initialization/upgrade protocol. First, I'm wondering if you have given any thought to what I proposed during the last Streams sync: that is, dropping the automatic topology upgrade feature from this KIP and classifying it as a feature/improvement for a future KIP rather than an essential function of the fundamental Streams rebalance protocol. Obviously this would mean embracing one of these options: A) we simply don't support changing/upgrading your topology in the initial implementation of the new Streams protocol B) add an API for manually initiating a topology upgrade (can be a KafkaStreams method or Admin API or a cli tool) To be honest I think option A) is actually fairly reasonable for a v0 implementation. Personally I would consider some of the essential tooling mentioned earlier as more essential than the ability to upgrade a topology so if we're going to cut the tooling from scope then imo it's ok to cut the automatic topology upgrade feature as well. Thoughts? That said, obviously option B is preferable if we're willing to pull it into the scope. We don't necessarily even need to add any additional RPCs and can just piggyback on the StreamsGroupInitialize RPC since that already does basically the exact same thing. In other words, I'd just add a public interface that triggers a StreamsGroupInitialize from an API that's accessible by the user. This has another advantage in that it could be used to create an empty group and let the broker begins the initialization process before starting up any of the KafkaStreams clients, although we wouldn't require this and would still have the automatic initialization procedure based on the first hb as described in the KIP. Anyways, the idea here is to give advanced users full control over the topology version of the group. Since topology upgrades are generally a fairly advanced topic and should always be performed with great care, it feels right to require an explicit request from the user rather than trying to infer the latest topology version in the automatic topology upgrade process outlined in the KIP. I just feel that the complexity and risks and edge cases of this process are too great to try and force into the scope of the basic proposal, especially to support a fairly rare and already-risky action such as upgrading a topology. Better to punt to a future KIP focused on this feature alone where we can discuss in detail without delaying the protocol itself. Just my two cents :) S9. On a related note, I had some thoughts on the topology ID generation which was touched on earlier in this thread. I'd like to advocate against making things too strict by doing something like hashing the Topology#describe string, since that would prevent some kinds of topology variances that are supported (or at least viable) to run today. For example one could theoretically insert additional processor nodes into the topology executed on one client but not another, eg a 'peek' that does some logging for debugging purposes. You can also give the processors different names on different clients. These differences would result in different topology IDs if they were just hashes of the Topology#describe output. So I'd propose going with the most bare-bones default implementation which generates a topology ID (eg by hashing) based on only what the broker actually needs to care about in order to perform an assignment. In other words, hashing just the contents of the "Topology" field in the StreamsGroupInitializeRequest (eg Subtopology and topic info) S10. Lastly, one final note about the topology upgrades, in case we decide to keep this feature in the KIP. It seems like the general idea is that after a topology ID change, nodes on the older ID will continue processing their assigned tasks but won't receive new ones until they upgrade. Nodes with the upgraded topology ID will receive new tasks. In the example included in the KIP, the assignor simply removes some tasks from the older nodes and moves them to the upgraded node. But this seems to only work if the topology change does not affect the structure of the topology and the set of tasks remains the same. What if the new topology has completely different tasks and input topics for those tasks? Or what if subtopology #1 is removed and all the task ids are shifted down by one -- in this case you could have an older node who keeps its assigned task 1_1 and gives up task 0_1 which is assigned to the upgraded node. In the new topology of the upgraded node, subtopology 0 is actually the same as subtopology 1 in the old topology. This means two clients are now attempting to process tasks which have the same input topics and would presumably be overwriting each other's committed offsets, which is not good. I'm not sure what the best way to handle this situation is but we definitely need some more details about how tasks are assigned when in the middle of an upgrade. Ok, that's it for now! Very excited for this KIP :D On Mon, Aug 26, 2024 at 1:11 AM Bruno Cadonna <br...@cadonna.name> wrote: > Hi Nick, > > NT4. > I agree with you that it is more correct to use the most recent offsets. > Although, as you already pointed out, a difference between most recent > and max should be rare. > > Best, > Bruno > > On 8/21/24 7:02 PM, Nick Telford wrote: > > Hi Lucas, > > > > NT4. > > Sounds good, although should it take the maximum offsets? Wouldn't it be > > more correct to take the *most recent* offsets? (i.e. the offsets from > the > > more recently received heartbeat) > > My thinking is that it might be possible (albeit exceptionally rare) for > > the on-disk offsets to revert to a previous number, and taking the max > > would incorrectly assume the older offsets are correct. > > > > Regards, > > Nick > > > > On Mon, 19 Aug 2024 at 15:00, Lucas Brutschy <lbruts...@confluent.io > .invalid> > > wrote: > > > >> Hi Nick, > >> > >> NT4: As discussed, we will still require locking in the new protocol > >> to avoid concurrent read/write access on the checkpoint file, at least > >> as long as KIP-1035 hasn't landed. However, as you correctly pointed > >> out, the assignor will have to accept offsets for overlapping sets of > >> dormant tasks. I updated the KIP to make this explicit. If the > >> corresponding offset information for one task conflicts between > >> clients (which can happen), the conflict is resolved by taking the > >> maximum of the offsets. > >> > >> Cheers, > >> Lucas > >> > >> On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang > >> <guozhang.wang...@gmail.com> wrote: > >>> > >>> Hello Lucas, > >>> > >>> Thanks for the great KIP. I've read it through and it looks good to > >>> me. As we've discussed, much of my thoughts would be outside the scope > >>> of this very well scoped and defined KIP, so I will omit them for now. > >>> > >>> The only one I had related to this KIP is about topology updating. I > >>> understand the motivation of the proposal is that basically since each > >>> time group forming a (new) generation may potentially accept not all > >>> of the members joining because of the timing of the RPCs, the group's > >>> topology ID may be not reflecting the "actual" most recent topologies > >>> if some zombie members holding an old topology form a group generation > >>> quickly enough, which would effectively mean that zombie members > >>> actually blocking other real members from getting tasks assigned. On > >>> the other hand, like you've mentioned already in the doc, requesting > >>> some sort of ID ordering by pushing the burden on the user's side > >>> would also be too much for users, increasing the risk of human errors > >>> in operations. > >>> > >>> I'm wondering if instead of trying to be smart programmingly, we just > >>> let the protocol to act dumbly (details below). The main reasons I had > >>> in mind are: > >>> > >>> 1) Upon topology changes, some tasks may no longer exist in the new > >>> topology, so still letting them execute on the clients which do not > >>> yet have the new topology would waste resources. > >>> > >>> 2) As we discussed, trying to act smart introduces more complexities > >>> in the coordinator that tries to balance different assignment goals > >>> between stickiness, balance, and now topology mis-matches between > >>> clients. > >>> > >>> 3) Scenarios that mismatching topologies be observed within a group > >> generation: > >>> a. Zombie / old clients that do not have the new topology, and will > >>> never have. > >>> b. During a rolling bounce upgrade, where not-yet-bounced clients > >>> would not yet have the new topology. > >>> c. Let's assume we would not ever have scenarios where users want > >>> to intentionally have a subset of clients within a group running a > >>> partial / subset of the full sub-topologies, since such cases can well > >>> be covered by a custom assignor that takes into those considerations > >>> by never assigning some tasks to some clients etc. That means, the > >>> only scenarios we would need to consider are a) and b). > >>> > >>> For b), I think it's actually okay to temporarily block the progress > >>> of the group until everyone is bounced with the updated topology; as > >>> for a), originally I thought having one or a few clients blocking the > >>> whole group would be a big problem, but now that I think more, I felt > >>> from the operations point of view, just letting the app being blocked > >>> with a informational log entry to quickly ping-down the zombie clients > >>> may actually be acceptable. All in all, it makes the code simpler > >>> programmingly by not trying to abstract away issue scenario a) from > >>> the users (or operators) but letting them know asap. > >>> > >>> ---------- > >>> > >>> Other than that, everything else looks good to me. > >>> > >>> > >>> Guozhang > >>> > >>> > >>> On Fri, Aug 16, 2024 at 7:38 AM Nick Telford <nick.telf...@gmail.com> > >> wrote: > >>>> > >>>> Hi Lucas, > >>>> > >>>> NT4. > >>>> Given that the new assignment procedure guarantees that a Task has > been > >>>> closed before it is assigned to a different client, I don't think > there > >>>> should be a problem with concurrent access? I don't think we should > >> worry > >>>> too much about 1035 here, as it's orthogonal to 1071. I don't think > >> that > >>>> 1035 *requires* the locking, and indeed once 1071 is the only > >> assignment > >>>> mechanism, we should be able to do away with the locking completely (I > >>>> think). > >>>> > >>>> Anyway, given your point about it not being possible to guarantee > >> disjoint > >>>> sets, does it make sense to require clients to continue to supply the > >> lags > >>>> for only a subset of the dormant Tasks on-disk? Wouldn't it be simpler > >> to > >>>> just have them supply everything, since the assignor has to handle > >>>> overlapping sets anyway? > >>>> > >>>> Cheers, > >>>> Nick > >>>> > >>>> On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy <lbruts...@confluent.io > >> .invalid> > >>>> wrote: > >>>> > >>>>> Hi Nick, > >>>>> > >>>>> NT4. I think it will be hard anyway to ensure that the assignor > >> always > >>>>> gets disjoint sets (there is no synchronized rebalance point anymore, > >>>>> so locks wouldn't prevent two clients reporting the same dormant > >>>>> task). So I think we'll have to lift this restriction. I was thinking > >>>>> more that locking is required to prevent concurrent access. In > >>>>> particular, I was expecting that the lock will avoid two threads > >>>>> opening the same RocksDB in KIP-1035. Wouldn't this cause problems? > >>>>> > >>>>> Cheers, > >>>>> Lucas > >>>>> > >>>>> On Fri, Aug 16, 2024 at 11:34 AM Nick Telford < > >> nick.telf...@gmail.com> > >>>>> wrote: > >>>>>> > >>>>>> Hi Lucas, > >>>>>> > >>>>>> NT4. > >>>>>> The reason I mentioned this was because, while implementing 1035, I > >>>>>> stumbled across a problem: initially I had changed it so that > >> threads > >>>>>> always reported the lag for *all* dormant Tasks on-disk, even if > >> it meant > >>>>>> multiple threads reporting lag for the same Tasks. I found that > >> this > >>>>> didn't > >>>>>> work, apparently because the assignor assumes that multiple > >> threads on > >>>>> the > >>>>>> same instance always report disjoint sets. > >>>>>> > >>>>>> From reading through 1071, it sounded like this assumption is no > >> longer > >>>>>> being made by the assignor, and that the processId field would > >> allow the > >>>>>> assignor to understand when multiple clients reporting lag for the > >> same > >>>>>> Tasks are on the same instance. This would enable us to do away > >> with the > >>>>>> locking when reporting lag, and just have threads report the lag > >> for > >>>>> every > >>>>>> Task on-disk, even if other threads are reporting lag for the same > >> Tasks. > >>>>>> > >>>>>> But it sounds like this is not correct, and that the new assignor > >> will > >>>>> make > >>>>>> the same assumptions as the old one? > >>>>>> > >>>>>> Regards, > >>>>>> Nick > >>>>>> > >>>>>> On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy < > >> lbruts...@confluent.io > >>>>> .invalid> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Nick! > >>>>>>> > >>>>>>> Thanks for getting involved in the discussion. > >>>>>>> > >>>>>>> NT1. We are always referring to offsets in the changelog topics > >> here. > >>>>>>> I tried to make it more consistent. But in the schemas and API, > >> I find > >>>>>>> "task changelog end offset" a bit lengthy, so we use "task > >> offset" and > >>>>>>> "task end offset" for short. We could change it, if people think > >> this > >>>>>>> is confusing. > >>>>>>> > >>>>>>> NT2. You are right. The confusing part is that the current > >> streams > >>>>>>> config is called `max.warmup.replicas`, but in the new protocol, > >> we > >>>>>>> are bounding the group-level parameter using > >>>>>>> `group.streams.max.warmup.replicas`. If we wanted to keep > >>>>>>> `group.streams.max.warmup.replicas` for the config name on the > >>>>>>> group-level, we'd have to bound it using > >>>>>>> `group.streams.max.max.warmup.replicas`. I prefer not doing > >> this, but > >>>>>>> open to suggestions. > >>>>>>> > >>>>>>> NT3. You are right, we do not need to make it this restrictive. I > >>>>>>> think the main problem with having 10,000 warm-up replicas would > >> be > >>>>>>> that it slows down the assignment inside the broker - once we are > >>>>>>> closer to production-ready implementation, we may have better > >> numbers > >>>>>>> of this and may revisit these defaults. I'll set the max to 100 > >> for > >>>>>>> now, but it would be good to hear what values people typically > >> use in > >>>>>>> their production workloads. > >>>>>>> > >>>>>>> NT4. We will actually only report the offsets if we manage to > >> acquire > >>>>>>> the lock. I tried to make this more precise. I suppose also with > >>>>>>> KIP-1035, we'd require the lock to read the offset? > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Lucas > >>>>>>> > >>>>>>> On Thu, Aug 15, 2024 at 8:40 PM Nick Telford < > >> nick.telf...@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>> Hi everyone, > >>>>>>>> > >>>>>>>> Looks really promising, and I can see this resolving several > >> issues > >>>>> I've > >>>>>>>> noticed. I particularly like the choice to use a String for > >>>>> Subtopology > >>>>>>> ID, > >>>>>>>> as it will (eventually) lead to a better solution to KIP-816. > >>>>>>>> > >>>>>>>> I noticed a few typos in the KIP that I thought I'd mention: > >>>>>>>> > >>>>>>>> NT1. > >>>>>>>> In several places you refer to "task changelog end offsets", > >> while in > >>>>>>>> others, you call it "task end offsets". Which is it? > >>>>>>>> > >>>>>>>> NT2. > >>>>>>>> Under "Group Configurations", you included > >>>>>>>> "group.streams.max.warmup.replicas", but I think you meant > >>>>>>>> "group.streams.num.warmup.replicas"? > >>>>>>>> > >>>>>>>> NT3. > >>>>>>>> Not a typo, but a suggestion: it makes sense to set the > >> default for > >>>>>>>> "group.streams.num.warmup.replicas" to 2, for compatibility > >> with the > >>>>>>>> existing defaults, but why set the default for > >>>>>>>> "group.streams.max.warmup.replicas" to only 4? That seems > >> extremely > >>>>>>>> restrictive. These "max" configs are typically used to prevent > >> a > >>>>> subset > >>>>>>> of > >>>>>>>> users causing problems on the shared broker cluster - what's > >> the > >>>>> reason > >>>>>>> to > >>>>>>>> set such a restrictive value for max warmup replicas? If I had > >> 10,000 > >>>>>>>> warmup replicas, would it cause a noticeable problem on the > >> brokers? > >>>>>>>> > >>>>>>>> NT4. > >>>>>>>> It's implied that clients send the changelog offsets for *all* > >>>>> dormant > >>>>>>>> stateful Tasks, but the current behaviour is that clients will > >> only > >>>>> send > >>>>>>>> the changelog offsets for the stateful Tasks that they are > >> able to > >>>>> lock > >>>>>>>> on-disk. Since this is a change in behaviour, perhaps this > >> should be > >>>>>>> called > >>>>>>>> out explicitly? > >>>>>>>> > >>>>>>>> Regards, > >>>>>>>> Nick > >>>>>>>> > >>>>>>>> On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy < > >> lbruts...@confluent.io > >>>>>>> .invalid> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi Andrew, > >>>>>>>>> > >>>>>>>>> thanks for the comment. > >>>>>>>>> > >>>>>>>>> AS12: I clarified the command-line interface. It's supposed > >> to be > >>>>> used > >>>>>>>>> with --reset-offsets and --delete-offsets. I removed --topic. > >>>>>>>>> > >>>>>>>>> AS13: Yes, it's --delete. I clarified the command-line > >> interface. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Lucas > >>>>>>>>> > >>>>>>>>> On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield > >>>>>>>>> <andrew_schofi...@live.com> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi Lucas, > >>>>>>>>>> Thanks for the KIP update. > >>>>>>>>>> > >>>>>>>>>> I think that `kafka-streams-groups.sh` looks like a good > >>>>> equivalent > >>>>>>> to > >>>>>>>>>> the tools for the other types of groups. > >>>>>>>>>> > >>>>>>>>>> AS12: In kafka-streams-groups.sh, the description for the > >>>>>>>>>> --input-topics option seems insufficient. Why is an input > >> topic > >>>>>>> specified > >>>>>>>>>> with this option different than a topic specified with > >> --topic? > >>>>> Why > >>>>>>> is > >>>>>>>>>> It --input-topics rather than --input-topic? Which action > >> of this > >>>>>>> tool > >>>>>>>>>> does this option apply to? > >>>>>>>>>> > >>>>>>>>>> AS13: Similarly, for --internal-topics, which action of > >> the tool > >>>>>>> does it > >>>>>>>>>> apply to? I suppose it’s --delete, but it’s not clear to > >> me. > >>>>>>>>>> > >>>>>>>>>> Thanks, > >>>>>>>>>> Andrew > >>>>>>>>>> > >>>>>>>>>>> On 11 Aug 2024, at 12:10, Lucas Brutschy < > >>>>> lbruts...@confluent.io > >>>>>>> .INVALID> > >>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Hi Andrew/Lianet, > >>>>>>>>>>> > >>>>>>>>>>> I have added an administrative command-line tool > >> (replacing > >>>>>>>>>>> `kafka-streams-application-reset`) and extensions of the > >> Admin > >>>>> API > >>>>>>> for > >>>>>>>>>>> listing, deleting, describing groups and listing, > >> altering and > >>>>>>>>>>> deleting offsets for streams groups. No new RPCs have to > >> be > >>>>> added, > >>>>>>>>>>> however, we duplicate some of the API in the admin > >> client that > >>>>>>> exist > >>>>>>>>>>> for consumer groups. It seems to me cleaner to duplicate > >> some > >>>>>>>>>>> code/interface here, instead of using "consumer group" > >> APIs for > >>>>>>>>>>> streams groups, or renaming existing APIs that use > >>>>> "consumerGroup" > >>>>>>> in > >>>>>>>>>>> the name to something more generic (which wouldn't cover > >> share > >>>>>>>>>>> groups). > >>>>>>>>>>> > >>>>>>>>>>> I think for now, all comments are addressed. > >>>>>>>>>>> > >>>>>>>>>>> Cheers, > >>>>>>>>>>> Lucas > >>>>>>>>>>> > >>>>>>>>>>> On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy < > >>>>>>> lbruts...@confluent.io> > >>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi Lianet and Andrew, > >>>>>>>>>>>> > >>>>>>>>>>>> LM1/LM2: You are right. The idea is to omit fields > >> exactly in > >>>>> the > >>>>>>> same > >>>>>>>>>>>> situations as in KIP-848. In the KIP, I stuck with how > >> the > >>>>>>> behavior > >>>>>>>>>>>> was defined in KIP-848 (e.g. KIP-848 defined that that > >>>>> instance ID > >>>>>>>>>>>> will be omitted if it did not change since the last > >>>>> heartbeat). > >>>>>>> But > >>>>>>>>>>>> you are correct that the implementation handles these > >> details > >>>>>>> slightly > >>>>>>>>>>>> differently. I updated the KIP to match more closely the > >>>>> behavior > >>>>>>> of > >>>>>>>>>>>> the KIP-848 implementation. > >>>>>>>>>>>> > >>>>>>>>>>>> LM9: Yes, there are several options to do this. The > >> idea is to > >>>>>>> have > >>>>>>>>>>>> only one client initialize the topology, not all > >> clients. It > >>>>> seems > >>>>>>>>>>>> easier to understand on the protocol level (otherwise > >> we'd > >>>>> have N > >>>>>>>>>>>> topology initializations racing with a hard-to-determine > >>>>> winner). > >>>>>>> We > >>>>>>>>>>>> also expect the payload of the request to grow in the > >> future > >>>>> and > >>>>>>> want > >>>>>>>>>>>> to avoid the overhead of having all clients sending the > >>>>> topology > >>>>>>> at > >>>>>>>>>>>> the same time. But initializing the group could take > >> some > >>>>> time - > >>>>>>> we > >>>>>>>>>>>> have to create internal topics, and maybe a client is > >>>>>>> malfunctioning > >>>>>>>>>>>> and the initialization has to be retried. It seemed a > >> bit > >>>>>>> confusing to > >>>>>>>>>>>> return errors to all other clients that are trying to > >> join the > >>>>>>> group > >>>>>>>>>>>> during that time - as if there was a problem with > >> joining the > >>>>>>> group / > >>>>>>>>>>>> the contents of the heartbeat. It seems cleaner to me > >> to let > >>>>> all > >>>>>>>>>>>> clients successfully join the group and heartbeat, but > >> remain > >>>>> in > >>>>>>> an > >>>>>>>>>>>> INITIALIZING state which does not yet assign any tasks. > >> Does > >>>>> that > >>>>>>> make > >>>>>>>>>>>> sense to you? You are right that returning a retriable > >> error > >>>>> and > >>>>>>>>>>>> having all clients retry until the group is initialized > >> would > >>>>> also > >>>>>>>>>>>> work, it just doesn't model well that "everything is > >> going > >>>>>>> according > >>>>>>>>>>>> to plan". > >>>>>>>>>>>> As for the order of the calls - yes, I think it is fine > >> to > >>>>> allow > >>>>>>> an > >>>>>>>>>>>> Initialize RPC before the first heartbeat for supporting > >>>>> future > >>>>>>> admin > >>>>>>>>>>>> tools. I made this change throughout the KIP, thanks! > >>>>>>>>>>>> > >>>>>>>>>>>> AS11: Yes, your understanding is correct. The number of > >> tasks > >>>>> for > >>>>>>> one > >>>>>>>>>>>> subtopology is the maximum number of partitions in any > >> of the > >>>>>>> matched > >>>>>>>>>>>> topics. What will happen in Kafka Streams is that the > >>>>> partitions > >>>>>>> of > >>>>>>>>>>>> the matched topics will effectively be merged during > >> stream > >>>>>>>>>>>> processing, so in your example, subtopology:0 would > >> consume > >>>>> from > >>>>>>> AB:0 > >>>>>>>>>>>> and AC:0. > >>>>>>>>>>>> > >>>>>>>>>>>> Cheers, > >>>>>>>>>>>> Lucas > >>>>>>>>>>>> > >>>>>>>>>>>> On Fri, Aug 2, 2024 at 9:47 PM Lianet M. < > >> liane...@gmail.com> > >>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hi Bruno, answering your questions: > >>>>>>>>>>>>> > >>>>>>>>>>>>> About the full heartbeat (LM1): I just wanted to > >> confirm that > >>>>>>> you'll > >>>>>>>>> be > >>>>>>>>>>>>> sending full HBs in case of errors in general. It's > >> not clear > >>>>>>> from > >>>>>>>>> the KIP, > >>>>>>>>>>>>> since it referred to sending Id/epoch and whatever had > >>>>> changed > >>>>>>> since > >>>>>>>>> the > >>>>>>>>>>>>> last HB only. Sending full HB on error is key to > >> ensure fresh > >>>>>>>>> rejoins after > >>>>>>>>>>>>> fencing for instance, and retries with all relevant > >> info. > >>>>>>>>>>>>> > >>>>>>>>>>>>> About the instanceId (LM2): The instanceId is needed on > >>>>> every HB > >>>>>>> to > >>>>>>>>> be able > >>>>>>>>>>>>> to identify a member using one that is already taken. > >> On > >>>>> every > >>>>>>> HB, > >>>>>>>>> the > >>>>>>>>>>>>> broker uses the instance id (if any) to retrieve the > >> member > >>>>> ID > >>>>>>>>> associated > >>>>>>>>>>>>> with it, and checks it against the memberId received > >> in the > >>>>> HB > >>>>>>>>>>>>> (throwing UnreleasedInstance exception if needed). So > >>>>> similar to > >>>>>>> my > >>>>>>>>>>>>> previous point, just wanted to confirm that we are > >>>>> considering > >>>>>>> that > >>>>>>>>> here > >>>>>>>>>>>>> too. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Now some other thoughts: > >>>>>>>>>>>>> > >>>>>>>>>>>>> LM9: Definitely interesting imo if we can avoid the > >>>>> dependency > >>>>>>>>> between the > >>>>>>>>>>>>> StreamsGroupInitialize and the StreamsGroupHeartbeat. I > >>>>> totally > >>>>>>> get > >>>>>>>>> that > >>>>>>>>>>>>> the initial client implementation will do a HB first, > >> and > >>>>> that's > >>>>>>>>> fine, but > >>>>>>>>>>>>> not having the flow enforced at the protocol level > >> would > >>>>> allow > >>>>>>> for > >>>>>>>>> further > >>>>>>>>>>>>> improvement in the future (that initialize via admin > >> idea you > >>>>>>>>> mentioned, > >>>>>>>>>>>>> for instance). Actually, I may be missing something > >> about > >>>>> the HB, > >>>>>>>>> but if we > >>>>>>>>>>>>> are at the point where HB requires that the topology > >> has been > >>>>>>>>> initialized, > >>>>>>>>>>>>> and the topology init requires the group, why is it the > >>>>> heartbeat > >>>>>>>>> RPC the > >>>>>>>>>>>>> one responsible for the group creation? (vs. > >>>>>>> StreamsGroupInitialize > >>>>>>>>> creates > >>>>>>>>>>>>> group if needed + HB just fails if topology not > >> initialized) > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>> > >>>>>>>>>>>>> Lianet > >>>>>>>>>>>>> (I didn't miss your answer on my INVALID_GROUP_TYPE > >> proposal, > >>>>>>> just > >>>>>>>>> still > >>>>>>>>>>>>> thinking about it in sync with the same discussion > >> we're > >>>>> having > >>>>>>> on > >>>>>>>>> the > >>>>>>>>>>>>> KIP-1043 thread...I'll come back on that) > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Aug 1, 2024 at 10:55 AM Andrew Schofield < > >>>>>>>>> andrew_schofi...@live.com> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Bruno, > >>>>>>>>>>>>>> Thanks for adding the detail on the schemas on records > >>>>> written > >>>>>>> to > >>>>>>>>>>>>>> __consumer_offsets. > >>>>>>>>>>>>>> I’ve reviewed them in detail and they look good to > >> me. I > >>>>> have > >>>>>>> one > >>>>>>>>> naive > >>>>>>>>>>>>>> question. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> AS11: I notice that an assignment is essentially a > >> set of > >>>>>>> partition > >>>>>>>>>>>>>> indices for > >>>>>>>>>>>>>> subtopologies. Since a subtopology can be defined by a > >>>>> source > >>>>>>> topic > >>>>>>>>> regex, > >>>>>>>>>>>>>> does > >>>>>>>>>>>>>> this mean that an assignment gives the same set of > >> partition > >>>>>>>>> indices for > >>>>>>>>>>>>>> all topics > >>>>>>>>>>>>>> which happen to match the regex? So, a subtopology > >> reading > >>>>> from > >>>>>>> A* > >>>>>>>>> that > >>>>>>>>>>>>>> matches > >>>>>>>>>>>>>> AB and AC would give the same set of partitions to > >> each > >>>>> task for > >>>>>>>>> both > >>>>>>>>>>>>>> topics, and > >>>>>>>>>>>>>> is not able to give AB:0 to one task and AC:0 to a > >> different > >>>>>>> task. > >>>>>>>>> Is this > >>>>>>>>>>>>>> correct? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 23 Jul 2024, at 16:16, Bruno Cadonna < > >>>>> cado...@apache.org> > >>>>>>>>> wrote: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Hi Lianet, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the review! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Here my answers: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM1. Is your question whether we need to send a full > >>>>> heartbeat > >>>>>>>>> each time > >>>>>>>>>>>>>> the member re-joins the group even if the information > >> in > >>>>> the RPC > >>>>>>>>> did not > >>>>>>>>>>>>>> change since the last heartbeat? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM2. Is the reason for sending the instance ID each > >> time > >>>>> that a > >>>>>>>>> member > >>>>>>>>>>>>>> could shutdown, change the instance ID and then start > >> and > >>>>>>> heartbeat > >>>>>>>>> again, > >>>>>>>>>>>>>> but the group coordinator would never notice that the > >>>>> instance > >>>>>>> ID > >>>>>>>>> changed? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM3. I see your point. I am wondering whether this > >>>>> additional > >>>>>>>>>>>>>> information is worth the dependency between the group > >>>>> types. To > >>>>>>>>> return > >>>>>>>>>>>>>> INVALID_GROUP_TYPE, the group coordinator needs to > >> know > >>>>> that a > >>>>>>>>> group ID > >>>>>>>>>>>>>> exists with a different group type. With a group > >>>>> coordinator as > >>>>>>> we > >>>>>>>>> have it > >>>>>>>>>>>>>> now in Apache Kafka that manages all group types, > >> that is > >>>>> not a > >>>>>>> big > >>>>>>>>> deal, > >>>>>>>>>>>>>> but imagine if we (or some implementation of the > >> Apache > >>>>> Kafka > >>>>>>>>> protocol) > >>>>>>>>>>>>>> decide to have a separate group coordinator for each > >> group > >>>>> type. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM4. Using INVALID_GROUP_ID if the group ID is empty > >> makes > >>>>>>> sense > >>>>>>>>> to me. > >>>>>>>>>>>>>> I going to change that. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM5. I think there is a dependency from the > >>>>>>> StreamsGroupInitialize > >>>>>>>>> RPC > >>>>>>>>>>>>>> to the heartbeat. The group must exist when the > >> initialize > >>>>> RPC > >>>>>>> is > >>>>>>>>> received > >>>>>>>>>>>>>> by the group coordinator. The group is created by the > >>>>> heartbeat > >>>>>>>>> RPC. I > >>>>>>>>>>>>>> would be in favor of making the initialize RPC > >> independent > >>>>> from > >>>>>>> the > >>>>>>>>>>>>>> heartbeat RPC. That would allow to initialize a > >> streams > >>>>> group > >>>>>>>>> explicitly > >>>>>>>>>>>>>> with an admin tool. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM6. I think it affects streams and streams should > >> behave > >>>>> as > >>>>>>> the > >>>>>>>>>>>>>> consumer group. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM7. Good point that we will consider. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> LM8. Fixed! Thanks! > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>> Bruno > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 7/19/24 9:53 PM, Lianet M. wrote: > >>>>>>>>>>>>>>>> Hi Lucas/Bruno, thanks for the great KIP! First > >> comments: > >>>>>>>>>>>>>>>> LM1. Related to where the KIP says: *“Group ID, > >> member > >>>>> ID, > >>>>>>>>> member epoch > >>>>>>>>>>>>>>>> are sent with each heartbeat request. Any other > >>>>> information > >>>>>>> that > >>>>>>>>> has not > >>>>>>>>>>>>>>>> changed since the last heartbeat can be omitted.”. > >> *I > >>>>> expect > >>>>>>> all > >>>>>>>>> the > >>>>>>>>>>>>>> other > >>>>>>>>>>>>>>>> info also needs to be sent whenever a full > >> heartbeat is > >>>>>>> required > >>>>>>>>> (even > >>>>>>>>>>>>>> if > >>>>>>>>>>>>>>>> it didn’t change from the last heartbeat), ex. on > >> fencing > >>>>>>>>> scenarios, > >>>>>>>>>>>>>>>> correct? > >>>>>>>>>>>>>>>> LM2. For consumer groups we always send the > >>>>> groupInstanceId > >>>>>>> (if > >>>>>>>>> any) as > >>>>>>>>>>>>>>>> part of every heartbeat, along with memberId, epoch > >> and > >>>>>>> groupId. > >>>>>>>>> Should > >>>>>>>>>>>>>> we > >>>>>>>>>>>>>>>> consider that too here? > >>>>>>>>>>>>>>>> LM3. We’re proposing returning a GROUP_ID_NOT_FOUND > >> error > >>>>> in > >>>>>>>>> response to > >>>>>>>>>>>>>>>> the stream-specific RPCs if the groupId is > >> associated > >>>>> with a > >>>>>>>>> group type > >>>>>>>>>>>>>>>> that is not streams (ie. consumer group or share > >> group). I > >>>>>>> wonder > >>>>>>>>> if at > >>>>>>>>>>>>>>>> this point, where we're getting several new group > >> types > >>>>> added, > >>>>>>>>> each with > >>>>>>>>>>>>>>>> RPCs that are supposed to include groupId of a > >> certain > >>>>> type, > >>>>>>> we > >>>>>>>>> should > >>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>> more explicit about this situation. Maybe a kind of > >>>>>>>>> INVALID_GROUP_TYPE > >>>>>>>>>>>>>>>> (group exists but not with a valid type for this > >> RPC) vs a > >>>>>>>>>>>>>>>> GROUP_ID_NOT_FOUND (group does not exist). Those > >> errors > >>>>>>> would be > >>>>>>>>>>>>>>>> consistently used across consumer, share, and > >> streams RPCs > >>>>>>>>> whenever the > >>>>>>>>>>>>>>>> group id is not of the expected type. > >>>>>>>>>>>>>>>> This is truly not specific to this KIP, and should > >> be > >>>>>>> addressed > >>>>>>>>> with all > >>>>>>>>>>>>>>>> group types and their RPCs in mind. I just wanted > >> to bring > >>>>>>> out my > >>>>>>>>>>>>>> concern > >>>>>>>>>>>>>>>> and get thoughts around it. > >>>>>>>>>>>>>>>> LM4. On a related note, StreamsGroupDescribe returns > >>>>>>>>> INVALID_REQUEST if > >>>>>>>>>>>>>>>> groupId is empty. There is already an > >> INVALID_GROUP_ID > >>>>> error, > >>>>>>>>> that seems > >>>>>>>>>>>>>>>> more specific to this situation. Error handling of > >>>>> specific > >>>>>>>>> errors would > >>>>>>>>>>>>>>>> definitely be easier than having to deal with a > >> generic > >>>>>>>>> INVALID_REQUEST > >>>>>>>>>>>>>>>> (and probably its custom message). I know that for > >>>>> KIP-848 we > >>>>>>> have > >>>>>>>>>>>>>>>> INVALID_REQUEST for similar situations, so if ever > >> we take > >>>>>>> down > >>>>>>>>> this > >>>>>>>>>>>>>> path > >>>>>>>>>>>>>>>> we should review it there too for consistency. > >> Thoughts? > >>>>>>>>>>>>>>>> LM5. The dependency between the > >> StreamsGroupHeartbeat RPC > >>>>> and > >>>>>>> the > >>>>>>>>>>>>>>>> StreamsGroupInitialize RPC is one-way only right? HB > >>>>> requires > >>>>>>> a > >>>>>>>>> previous > >>>>>>>>>>>>>>>> StreamsGroupInitialize request, but > >> StreamsGroupInitialize > >>>>>>>>> processing is > >>>>>>>>>>>>>>>> totally independent of heartbeats (and could > >> perfectly be > >>>>>>>>> processed > >>>>>>>>>>>>>> without > >>>>>>>>>>>>>>>> a previous HB, even though the client > >> implementation we’re > >>>>>>>>> proposing > >>>>>>>>>>>>>> won’t > >>>>>>>>>>>>>>>> go down that path). Is my understanding correct? > >> Just to > >>>>>>> double > >>>>>>>>> check, > >>>>>>>>>>>>>>>> seems sensible like that at the protocol level. > >>>>>>>>>>>>>>>> LM6. With KIP-848, there is an important > >> improvement that > >>>>>>> brings a > >>>>>>>>>>>>>>>> difference in behaviour around the static > >> membership: > >>>>> with the > >>>>>>>>> classic > >>>>>>>>>>>>>>>> protocol, if a static member joins with a group > >> instance > >>>>>>> already > >>>>>>>>> in > >>>>>>>>>>>>>> use, it > >>>>>>>>>>>>>>>> makes the initial member fail with a > >> FENCED_INSTANCED_ID > >>>>>>>>> exception, vs. > >>>>>>>>>>>>>>>> with the new consumer group protocol, the second > >> member > >>>>>>> trying to > >>>>>>>>> join > >>>>>>>>>>>>>>>> fails with an UNRELEASED_INSTANCE_ID. Does this > >> change > >>>>> need > >>>>>>> to be > >>>>>>>>>>>>>>>> considered in any way for the streams app? (I'm not > >>>>> familiar > >>>>>>> with > >>>>>>>>> KS > >>>>>>>>>>>>>> yet, > >>>>>>>>>>>>>>>> but thought it was worth asking. If it doesn't > >> affect in > >>>>> any > >>>>>>> way, > >>>>>>>>> still > >>>>>>>>>>>>>>>> maybe helpful to call it out on a section for static > >>>>>>> membership) > >>>>>>>>>>>>>>>> LM7. Regarding the admin tool to manage streams > >> groups. > >>>>> We can > >>>>>>>>> discuss > >>>>>>>>>>>>>>>> whether to have it here or separately, but I think > >> we > >>>>> should > >>>>>>> aim > >>>>>>>>> for > >>>>>>>>>>>>>> some > >>>>>>>>>>>>>>>> basic admin capabilities from the start, mainly > >> because I > >>>>>>> believe > >>>>>>>>> it > >>>>>>>>>>>>>> will > >>>>>>>>>>>>>>>> be very helpful/needed in practice during the impl > >> of the > >>>>> KIP. > >>>>>>>>> From > >>>>>>>>>>>>>>>> experience with KIP-848, we felt a bit blindfolded > >> in the > >>>>>>> initial > >>>>>>>>> phase > >>>>>>>>>>>>>>>> where we still didn't have kafka-consumer-groups > >> dealing > >>>>> with > >>>>>>> the > >>>>>>>>> new > >>>>>>>>>>>>>>>> groups (and then it was very helpful and used when > >> we were > >>>>>>> able to > >>>>>>>>>>>>>> easily > >>>>>>>>>>>>>>>> inspect them from the console) > >>>>>>>>>>>>>>>> LM8. nit: the links the KIP-848 are not quite right > >>>>> (pointing > >>>>>>> to > >>>>>>>>> an > >>>>>>>>>>>>>>>> unrelated “Future work section” at the end of > >> KIP-848) > >>>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>>>> Lianet > >>>>>>>>>>>>>>>> On Fri, Jul 19, 2024 at 11:13 AM Lucas Brutschy > >>>>>>>>>>>>>>>> <lbruts...@confluent.io.invalid> wrote: > >>>>>>>>>>>>>>>>> Hi Andrew, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AS2: I added a note for now. If others feel > >> strongly > >>>>> about > >>>>>>> it, > >>>>>>>>> we can > >>>>>>>>>>>>>>>>> still add more administrative tools to the KIP - it > >>>>> should > >>>>>>> not > >>>>>>>>> change > >>>>>>>>>>>>>>>>> the overall story significantly. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AS8: "streams.group.assignor.name" sounds good to > >> me to > >>>>>>>>> distinguish > >>>>>>>>>>>>>>>>> the config from class names. Not sure if I like the > >>>>>>> "default". > >>>>>>>>> To be > >>>>>>>>>>>>>>>>> consistent, we'd then have to call it > >>>>>>>>>>>>>>>>> `group.streams.default.session.timeout.ms` as > >> well. I > >>>>> only > >>>>>>>>> added the > >>>>>>>>>>>>>>>>> `.name` on both broker and group level for now. > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> AS10: Ah, I misread your comment, now I know what > >> you > >>>>> meant. > >>>>>>> Good > >>>>>>>>>>>>>>>>> point, fixed (by Bruno). > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>> Lucas > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> On Fri, Jul 19, 2024 at 4:44 PM Andrew Schofield > >>>>>>>>>>>>>>>>> <andrew_schofi...@live.com> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi Lucas, > >>>>>>>>>>>>>>>>>> I see that I hit send too quickly. One more > >> comment: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> AS2: I think stating that there will be a > >>>>>>>>> `kafka-streams-group.sh` in > >>>>>>>>>>>>>> a > >>>>>>>>>>>>>>>>>> future KIP is fine to keep this KIP focused. > >>>>> Personally, I > >>>>>>> would > >>>>>>>>>>>>>> probably > >>>>>>>>>>>>>>>>>> put all of the gory details in this KIP, but then > >> it’s > >>>>> not > >>>>>>> my > >>>>>>>>> KIP. A > >>>>>>>>>>>>>>>>> future > >>>>>>>>>>>>>>>>>> pointer is fine too. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On 19 Jul 2024, at 13:46, Lucas Brutschy < > >>>>>>>>> lbruts...@confluent.io > >>>>>>>>>>>>>> .INVALID> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Hi Andrew, > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> thanks for getting the discussion going! Here > >> are my > >>>>>>> responses. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS1: Good point, done. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS2: We were planning to add more administrative > >> tools > >>>>> to > >>>>>>> the > >>>>>>>>>>>>>>>>>>> interface in a follow-up KIP, to not make this > >> KIP too > >>>>>>> large. > >>>>>>>>> If > >>>>>>>>>>>>>>>>>>> people think that it would help to understand the > >>>>> overall > >>>>>>>>> picture if > >>>>>>>>>>>>>>>>>>> we already add something like > >>>>> `kafka-streams-groups.sh`, we > >>>>>>>>> will do > >>>>>>>>>>>>>>>>>>> that. I also agree that we should address how > >> this > >>>>> relates > >>>>>>> to > >>>>>>>>>>>>>>>>>>> KIP-1043, we'll add it. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS3: Good idea, that's more consistent with > >>>>> `assigning` and > >>>>>>>>>>>>>>>>> `reconciling` etc. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS4: Thanks, Fixed. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS5: Good catch. This was supposed to mean that > >> we > >>>>> require > >>>>>>>>> CREATE on > >>>>>>>>>>>>>>>>>>> cluster or CREATE on all topics, not both. Fixed. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS6: Thanks, Fixed. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS7. Thanks, Fixed. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS8: I think this works a bit different in this > >> KIP > >>>>> than in > >>>>>>>>> consumer > >>>>>>>>>>>>>>>>>>> groups. KIP-848 lets the members vote for a > >> preferred > >>>>>>>>> assignor, and > >>>>>>>>>>>>>>>>>>> the broker-side assignor is picked by majority > >> vote. > >>>>> The > >>>>>>>>>>>>>>>>>>> `group.consumer.assignors` specifies the list of > >>>>> assignors > >>>>>>>>> that are > >>>>>>>>>>>>>>>>>>> supported on the broker, and is configurable > >> because > >>>>> the > >>>>>>>>> interface is > >>>>>>>>>>>>>>>>>>> pluggable. In this KIP, the task assignor is not > >> voted > >>>>> on > >>>>>>> by > >>>>>>>>> members > >>>>>>>>>>>>>>>>>>> but configured on the broker-side. > >>>>>>> `group.streams.assignor` is > >>>>>>>>> used > >>>>>>>>>>>>>>>>>>> for this, and uses a specific name. If we'll > >> make the > >>>>> task > >>>>>>>>> assignor > >>>>>>>>>>>>>>>>>>> pluggable on the broker-side, we'd introduce a > >> separate > >>>>>>> config > >>>>>>>>>>>>>>>>>>> `group.streams.assignors`, which would indeed be > >> a > >>>>> list of > >>>>>>>>> class > >>>>>>>>>>>>>>>>>>> names. I think there is no conflict here, the two > >>>>>>>>> configurations > >>>>>>>>>>>>>> serve > >>>>>>>>>>>>>>>>>>> different purposes. The only gripe I'd have > >> here is > >>>>>>> naming as > >>>>>>>>>>>>>>>>>>> `group.streams.assignor` and > >> `group.streams.assignors` > >>>>>>> would > >>>>>>>>> be a bit > >>>>>>>>>>>>>>>>>>> similar, but I cannot really think of a better > >> name for > >>>>>>>>>>>>>>>>>>> `group.streams.assignor`, so I'd probably rather > >>>>> introduce > >>>>>>>>>>>>>>>>>>> `group.streams.assignors` as > >>>>>>>>> `group.streams.possible_assignors` or > >>>>>>>>>>>>>>>>>>> something like that. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS9: I added explanations for the various record > >> types. > >>>>>>> Apart > >>>>>>>>> from > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> new topology record, and the partition metadata > >> (which > >>>>> is > >>>>>>>>> based on > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>> topology and can only be created once we have a > >>>>> topology > >>>>>>>>> initialized) > >>>>>>>>>>>>>>>>>>> the lifecycle for the records is basically > >> identical > >>>>> as in > >>>>>>>>> KIP-848. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> AS10: In the consumer offset topic, the version > >> in the > >>>>> key > >>>>>>> is > >>>>>>>>> used to > >>>>>>>>>>>>>>>>>>> differentiate different schema types with the > >> same > >>>>>>> content. So > >>>>>>>>> the > >>>>>>>>>>>>>>>>>>> keys are not versioned, but the version field is > >>>>> "abused" > >>>>>>> as a > >>>>>>>>> type > >>>>>>>>>>>>>>>>>>> tag. This is the same in KIP-848, we followed it > >> for > >>>>>>>>> consistency. > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> Cheers, > >>>>>>>>>>>>>>>>>>> Lucas > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>> On Thu, Jul 18, 2024 at 1:27 PM Andrew Schofield > >>>>>>>>>>>>>>>>>>> <andrew_schofi...@live.com> wrote: > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Hi Lucas and Bruno, > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks for the great KIP. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> I've read through the document and have some > >> initial > >>>>>>> comments. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS1: I suppose that there is a new > >>>>>>>>> o.a.k.common.GroupType.STREAMS > >>>>>>>>>>>>>>>>> enumeration > >>>>>>>>>>>>>>>>>>>> constant. This is a change to the public > >> interface and > >>>>>>> should > >>>>>>>>> be > >>>>>>>>>>>>>>>>> called out. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS2: Since streams groups are no longer consumer > >>>>> groups, > >>>>>>> how > >>>>>>>>> does > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>> user > >>>>>>>>>>>>>>>>>>>> manipulate them, observe lag and so on? Will > >> you add > >>>>>>>>>>>>>>>>> `kafka-streams-groups.sh` > >>>>>>>>>>>>>>>>>>>> or extend `kafka-streams-application-reset.sh`? > >> Of > >>>>> course, > >>>>>>>>> KIP-1043 > >>>>>>>>>>>>>>>>> can easily > >>>>>>>>>>>>>>>>>>>> be extended to support streams groups, but that > >> only > >>>>> lets > >>>>>>> the > >>>>>>>>> user > >>>>>>>>>>>>>>>>> see the > >>>>>>>>>>>>>>>>>>>> groups, not manipulate them. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS3: I wonder whether the streams group state of > >>>>>>>>> UNINITIALIZED would > >>>>>>>>>>>>>>>>> be > >>>>>>>>>>>>>>>>>>>> better expressed as INITIALIZING. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS4: In StreamsGroupInitializeRequest, > >>>>>>>>> Topology[].SourceTopicRegex > >>>>>>>>>>>>>>>>> should > >>>>>>>>>>>>>>>>>>>> be nullable. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS5: Why does StreamsGroupInitialize require > >> CREATE > >>>>>>>>> permission on > >>>>>>>>>>>>>> the > >>>>>>>>>>>>>>>>>>>> cluster resource? I imagine that this is one of > >> the > >>>>> ways > >>>>>>> that > >>>>>>>>> the > >>>>>>>>>>>>>>>>> request might > >>>>>>>>>>>>>>>>>>>> be granted permission to create the > >>>>> StateChangelogTopics > >>>>>>> and > >>>>>>>>>>>>>>>>>>>> RepartitionSourceTopics, but if it is granted > >>>>> permission > >>>>>>> to > >>>>>>>>> create > >>>>>>>>>>>>>>>>> those topics > >>>>>>>>>>>>>>>>>>>> with specific ACLs, would CREATE on the cluster > >>>>> resource > >>>>>>>>> still be > >>>>>>>>>>>>>>>>> required? > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS6: StreamsGroupInitialize can also fail with > >>>>>>>>>>>>>>>>> TOPIC_AUTHORIZATION_FAILED > >>>>>>>>>>>>>>>>>>>> and (subject to AS5) > >> CLUSTER_AUTHORIZATION_FAILED. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS7: A tiny nit. You've used TopologyID > >> (capitals) in > >>>>>>>>>>>>>>>>> StreamsGroupHeartbeatRequest > >>>>>>>>>>>>>>>>>>>> and a few others, but in all other cases the > >> fields > >>>>> which > >>>>>>> are > >>>>>>>>> ids > >>>>>>>>>>>>>> are > >>>>>>>>>>>>>>>>> spelled Id. > >>>>>>>>>>>>>>>>>>>> I suggest TopologyId. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Also, "interal" is probably meant to be > >> "interval”. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS8: For consumer groups, the > >>>>> `group.consumer.assignors` > >>>>>>>>>>>>>>>>> configuration is a > >>>>>>>>>>>>>>>>>>>> list of class names. The assignors do have > >> names too, > >>>>> but > >>>>>>> the > >>>>>>>>>>>>>>>>> configuration which > >>>>>>>>>>>>>>>>>>>> enables them is in terms of class names. I > >> wonder > >>>>> whether > >>>>>>> the > >>>>>>>>>>>>>> broker’s > >>>>>>>>>>>>>>>>>>>> group.streams.assignor could actually be > >>>>>>>>> `group.streams.assignors` > >>>>>>>>>>>>>>>>> and specified > >>>>>>>>>>>>>>>>>>>> as a list of the class names of the supplied > >>>>> assignors. I > >>>>>>> know > >>>>>>>>>>>>>> you're > >>>>>>>>>>>>>>>>> not supporting > >>>>>>>>>>>>>>>>>>>> other assignors yet, but when you do, I expect > >> you > >>>>> would > >>>>>>>>> prefer to > >>>>>>>>>>>>>>>>> have used class > >>>>>>>>>>>>>>>>>>>> names from the start. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> The use of assignor names in the other places > >> looks > >>>>> good > >>>>>>> to > >>>>>>>>> me. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS9: I'd find it really helpful to have a bit > >> of a > >>>>>>>>> description about > >>>>>>>>>>>>>>>>> the purpose and > >>>>>>>>>>>>>>>>>>>> lifecycle of the 9 record types you've > >> introduced on > >>>>> the > >>>>>>>>>>>>>>>>> __consumer_offsets topic. > >>>>>>>>>>>>>>>>>>>> I did a cursory review but without really > >>>>> understanding > >>>>>>> what's > >>>>>>>>>>>>>>>>> written when, > >>>>>>>>>>>>>>>>>>>> I can't do a thorough job of reviewing. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> AS10: In the definitions of the record keys, > >> such as > >>>>>>>>>>>>>>>>>>>> StreamsGroupCurrentMemberAssignmentKey, the > >> versions > >>>>> of > >>>>>>> the > >>>>>>>>> fields > >>>>>>>>>>>>>>>>> must > >>>>>>>>>>>>>>>>>>>> match the versions of the types. > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>>>> Andrew > >>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> On 12 Jul 2024, at 09:04, Lucas Brutschy < > >>>>>>>>> lbruts...@confluent.io > >>>>>>>>>>>>>> .INVALID> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> I would like to start a discussion thread on > >>>>> KIP-1071: > >>>>>>>>> Streams > >>>>>>>>>>>>>>>>>>>>> Rebalance Protocol. With this KIP, we aim to > >> bring > >>>>> the > >>>>>>>>> principles > >>>>>>>>>>>>>>>>> laid > >>>>>>>>>>>>>>>>>>>>> down by KIP-848 to Kafka Streams, to make > >> rebalances > >>>>> more > >>>>>>>>> reliable > >>>>>>>>>>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> scalable, and make Kafka Streams overall > >> easier to > >>>>>>> deploy and > >>>>>>>>>>>>>>>>> operate. > >>>>>>>>>>>>>>>>>>>>> The KIP proposed moving the assignment logic > >> to the > >>>>>>> broker, > >>>>>>>>> and > >>>>>>>>>>>>>>>>>>>>> introducing a dedicated group type and > >> dedicated > >>>>> RPCs for > >>>>>>>>> Kafka > >>>>>>>>>>>>>>>>>>>>> Streams. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> The KIP is here: > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1071%3A+Streams+Rebalance+Protocol > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> This is joint work with Bruno Cadonna. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Please take a look and let us know what you > >> think. > >>>>>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>>>>>>>>> Lucas > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>> > >>>>> > >>>>> > >>> > >> > >> > > > >