Hi Lucas, Thanks for the KIP.
PY1: In StreamsGroupHeartbeatRequest, it uses broker and zkBroker in listeners field. Is this intended? IIUC, the KIP will be implemented in 4.1. The zookeeper will be removed in 4.0. Probably, we should change it as broker and controller. We may need a similar change for StreamsGroupDescribeRequest. Best, PoAn On 2024/09/02 12:23:22 Bruno Cadonna wrote: > Hi all, > > For your info, I updated the StreamsGroupInitialize request with the > following changes: > > 1. I added the topology ID to the request so that the group coordinator > knows for which topology it got the initialization. > > 2. I renamed field "Subtopology" to "SubtopologyId" since the field is > the ID of the subtopology but that was not clear from the name. > > Best, > Bruno > > > On 8/28/24 2:06 PM, Lucas Brutschy wrote: > > Hi Sophie, > > > > Thanks for your detailed comments - much appreciated! I think you read > > a version of the KIP that did not yet include the admin command-line > > tool and the Admin API extensions, so some of the comments are already > > addressed in the KIP. > > > > S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the > > consumer background thread. Note that in the new consumer threading > > model, all RPCs are run by the background thread. Check out this: > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design > > for more information. Both our RPCs are just part of the group > > membership management and do not need to be invoked explicitly by the > > application thread. You could imagine calling the initialize RPC > > explicitly (to implement explicit initialization), but this would > > still mean sending an event to the background thread, and the > > background thread in turn invokes the RPC. However, explicit > > initialization would require some additional public interfaces that we > > are not including in this KIP. StreamsGroupDescribe is called by the > > AdminClient, and used by the command-line tool > > kafka-streams-groups.sh. > > > > S2. I think the max.warmup.replicas=100 suggested by Nick was intended > > as the upper limit for setting the group configuration on the broker. > > Just to make sure that this was not a misunderstanding. By default, > > values above 100 should be rejected when setting a specific value for > > group. Are you suggesting 20 or 30 for the default value for groups, > > or the default upper limit for the group configuration? > > > > S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The > > MemberEpoch=-1 is a left-over from an earlier discussion. It means > > that the member is leaving the group, so the intention was that the > > member must leave the group when it asks the other members to shut > > down. We later reconsidered this and decided that all applications > > should just react to the shutdown application signal that is returned > > by the broker, so the client first sets the ShutdownApplication and > > later leaves the group. Thanks for spotting this, I removed it. > > > > S4. Not sure if this refers to the latest version of the KIP. We added > > an extension of the admin API and a kafka-streams-groups.sh > > command-line tool to the KIP already. > > > > S5. All RPCs for dealing with offsets will keep working with streams > > groups. The extension of the admin API is rather cosmetic, since the > > method names use "consumer group". The RPCs, however, are generic and > > do not need to be changed. > > > > S6. Yes, you can use the DeleteGroup RPC with any group ID, whether > > streams group or not. > > > > S7. See the admin API section. > > > > S8. I guess for both A and B, I am not sure what you are suggesting. > > Do you want to make the broker-side topology immutable and not include > > any information about the topology, like the topology ID in the RPC? > > It would seem that this would be a massive food-gun for people, if > > they start changing their topology and don't notice that the broker is > > looking at a completely different version of the topology. Or did you > > mean that there is some kind of topology ID, so that at least we can > > detect inconsistencies between broker and client-side topologies, and > > we fence out any member with an incorrect topology ID? Then we seem to > > end up with mostly the same RPCs and the same questions (how is the > > topology ID generated?). I agree that the latter could be an option. > > See summary at the end of this message. > > > > S9. If we flat out refuse topology updates, agree - we cannot let the > > application crash on minor changes to the topology. However, if we > > allow topology updates as described in the KIP, there are only upsides > > to making the topology ID more sensitive. All it will cause is that a > > client will have to resend a `StreamsGroupInitialize`, and during the > > rolling bounce, older clients will not get tasks assigned. > > > > S10. The intention in the KIP is really just this - old clients can > > only retain tasks, but not get new ones. If the topology has > > sub-topologies, these will initially be assigned to the new clients, > > which also have the new topology. You are right that rolling an > > application where the old structure and the new structure are > > incompatible (e.g. different subtopologies access the same partition) > > will cause problems. But this will also cause problems in the current > > protocol, so I'm not sure, if it's strictly a regression, it's just > > unsupported (which we can only make the best effort to detect). > > > > In summary, for the topology ID discussion, I mainly see two options: > > 1) stick with the current KIP proposal > > 2) define the topology ID as the hash of the StreamsGroupInitialize > > topology metadata only, as you suggested, and fence out members with > > an incompatible topology ID. > > I think doing 2 is also a good option, but in the end it comes down to > > how important we believe topology updates (where the set of > > sub-topologies or set of internal topics are affected) to be. The main > > goal is to make the new protocol a drop-in replacement for the old > > protocol, and at least define the RPCs in a way that we won't need > > another KIP which bumps the RPC version to make the protocol > > production-ready. Adding more command-line tools, or public interfaces > > seems less critical as it doesn't change the protocol and can be done > > easily later on. The main question becomes - is the protocol > > production-ready and sufficient to replace the classic protocol if we > > go with 2? > > > > Cheers, > > Lucas >