Hi PoAn, thanks for looking into the KIP!
PY1: You are right, zkBroker was added following KIP-848 during development, and is indeed not required anymore. We don't want to allow these RPCs on the controller, so we have to remove `zkBroker`, not replace it with `controller`. I updated the KIP, I see you already fixed it in trunk. Cheers, Lucas On Thu, Dec 26, 2024 at 11:28 AM PoAn Yang <pay...@apache.org> wrote: > > Hi Lucas, > > Thanks for the KIP. > > PY1: In StreamsGroupHeartbeatRequest, it uses broker and zkBroker in > listeners field. > Is this intended? IIUC, the KIP will be implemented in 4.1. The zookeeper > will be > removed in 4.0. Probably, we should change it as broker and controller. > We may need a similar change for StreamsGroupDescribeRequest. > > Best, > PoAn > > On 2024/09/02 12:23:22 Bruno Cadonna wrote: > > Hi all, > > > > For your info, I updated the StreamsGroupInitialize request with the > > following changes: > > > > 1. I added the topology ID to the request so that the group coordinator > > knows for which topology it got the initialization. > > > > 2. I renamed field "Subtopology" to "SubtopologyId" since the field is > > the ID of the subtopology but that was not clear from the name. > > > > Best, > > Bruno > > > > > > On 8/28/24 2:06 PM, Lucas Brutschy wrote: > > > Hi Sophie, > > > > > > Thanks for your detailed comments - much appreciated! I think you read > > > a version of the KIP that did not yet include the admin command-line > > > tool and the Admin API extensions, so some of the comments are already > > > addressed in the KIP. > > > > > > S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the > > > consumer background thread. Note that in the new consumer threading > > > model, all RPCs are run by the background thread. Check out this: > > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > > > for more information. Both our RPCs are just part of the group > > > membership management and do not need to be invoked explicitly by the > > > application thread. You could imagine calling the initialize RPC > > > explicitly (to implement explicit initialization), but this would > > > still mean sending an event to the background thread, and the > > > background thread in turn invokes the RPC. However, explicit > > > initialization would require some additional public interfaces that we > > > are not including in this KIP. StreamsGroupDescribe is called by the > > > AdminClient, and used by the command-line tool > > > kafka-streams-groups.sh. > > > > > > S2. I think the max.warmup.replicas=100 suggested by Nick was intended > > > as the upper limit for setting the group configuration on the broker. > > > Just to make sure that this was not a misunderstanding. By default, > > > values above 100 should be rejected when setting a specific value for > > > group. Are you suggesting 20 or 30 for the default value for groups, > > > or the default upper limit for the group configuration? > > > > > > S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The > > > MemberEpoch=-1 is a left-over from an earlier discussion. It means > > > that the member is leaving the group, so the intention was that the > > > member must leave the group when it asks the other members to shut > > > down. We later reconsidered this and decided that all applications > > > should just react to the shutdown application signal that is returned > > > by the broker, so the client first sets the ShutdownApplication and > > > later leaves the group. Thanks for spotting this, I removed it. > > > > > > S4. Not sure if this refers to the latest version of the KIP. We added > > > an extension of the admin API and a kafka-streams-groups.sh > > > command-line tool to the KIP already. > > > > > > S5. All RPCs for dealing with offsets will keep working with streams > > > groups. The extension of the admin API is rather cosmetic, since the > > > method names use "consumer group". The RPCs, however, are generic and > > > do not need to be changed. > > > > > > S6. Yes, you can use the DeleteGroup RPC with any group ID, whether > > > streams group or not. > > > > > > S7. See the admin API section. > > > > > > S8. I guess for both A and B, I am not sure what you are suggesting. > > > Do you want to make the broker-side topology immutable and not include > > > any information about the topology, like the topology ID in the RPC? > > > It would seem that this would be a massive food-gun for people, if > > > they start changing their topology and don't notice that the broker is > > > looking at a completely different version of the topology. Or did you > > > mean that there is some kind of topology ID, so that at least we can > > > detect inconsistencies between broker and client-side topologies, and > > > we fence out any member with an incorrect topology ID? Then we seem to > > > end up with mostly the same RPCs and the same questions (how is the > > > topology ID generated?). I agree that the latter could be an option. > > > See summary at the end of this message. > > > > > > S9. If we flat out refuse topology updates, agree - we cannot let the > > > application crash on minor changes to the topology. However, if we > > > allow topology updates as described in the KIP, there are only upsides > > > to making the topology ID more sensitive. All it will cause is that a > > > client will have to resend a `StreamsGroupInitialize`, and during the > > > rolling bounce, older clients will not get tasks assigned. > > > > > > S10. The intention in the KIP is really just this - old clients can > > > only retain tasks, but not get new ones. If the topology has > > > sub-topologies, these will initially be assigned to the new clients, > > > which also have the new topology. You are right that rolling an > > > application where the old structure and the new structure are > > > incompatible (e.g. different subtopologies access the same partition) > > > will cause problems. But this will also cause problems in the current > > > protocol, so I'm not sure, if it's strictly a regression, it's just > > > unsupported (which we can only make the best effort to detect). > > > > > > In summary, for the topology ID discussion, I mainly see two options: > > > 1) stick with the current KIP proposal > > > 2) define the topology ID as the hash of the StreamsGroupInitialize > > > topology metadata only, as you suggested, and fence out members with > > > an incompatible topology ID. > > > I think doing 2 is also a good option, but in the end it comes down to > > > how important we believe topology updates (where the set of > > > sub-topologies or set of internal topics are affected) to be. The main > > > goal is to make the new protocol a drop-in replacement for the old > > > protocol, and at least define the RPCs in a way that we won't need > > > another KIP which bumps the RPC version to make the protocol > > > production-ready. Adding more command-line tools, or public interfaces > > > seems less critical as it doesn't change the protocol and can be done > > > easily later on. The main question becomes - is the protocol > > > production-ready and sufficient to replace the classic protocol if we > > > go with 2? > > > > > > Cheers, > > > Lucas > >