Hi PoAn,

thanks for looking into the KIP!

PY1: You are right, zkBroker was added following KIP-848 during
development, and is indeed not required anymore. We don't want to
allow these RPCs on the controller, so we have to remove `zkBroker`,
not replace it with `controller`. I updated the KIP, I see you already
fixed it in trunk.

Cheers,
Lucas

On Thu, Dec 26, 2024 at 11:28 AM PoAn Yang <pay...@apache.org> wrote:
>
> Hi Lucas,
>
> Thanks for the KIP.
>
> PY1: In StreamsGroupHeartbeatRequest, it uses broker and zkBroker in 
> listeners field.
> Is this intended? IIUC, the KIP will be implemented in 4.1. The zookeeper 
> will be
> removed in 4.0. Probably, we should change it as broker and controller.
> We may need a similar change for StreamsGroupDescribeRequest.
>
> Best,
> PoAn
>
> On 2024/09/02 12:23:22 Bruno Cadonna wrote:
> > Hi all,
> >
> > For your info, I updated the StreamsGroupInitialize request with the
> > following changes:
> >
> > 1. I added the topology ID to the request so that the group coordinator
> > knows for which topology it got the initialization.
> >
> > 2. I renamed field "Subtopology" to "SubtopologyId" since the field is
> > the ID of the subtopology but that was not clear from the name.
> >
> > Best,
> > Bruno
> >
> >
> > On 8/28/24 2:06 PM, Lucas Brutschy wrote:
> > > Hi Sophie,
> > >
> > > Thanks for your detailed comments - much appreciated! I think you read
> > > a version of the KIP that did not yet include the admin command-line
> > > tool and the Admin API extensions, so some of the comments are already
> > > addressed in the KIP.
> > >
> > > S1. StreamsGroupHeartbeat and StreamsGroupInitialize are called in the
> > > consumer background thread. Note that in the new consumer threading
> > > model, all RPCs are run by the background thread. Check out this:
> > > https://cwiki.apache.org/confluence/display/KAFKA/Consumer+threading+refactor+design
> > > for more information. Both our RPCs are just part of the group
> > > membership management and do not need to be invoked explicitly by the
> > > application thread. You could imagine calling the initialize RPC
> > > explicitly (to implement explicit initialization), but this would
> > > still mean sending an event to the background thread, and the
> > > background thread in turn invokes the RPC. However, explicit
> > > initialization would require some additional public interfaces that we
> > > are not including in this KIP. StreamsGroupDescribe is called by the
> > > AdminClient, and used by the command-line tool
> > > kafka-streams-groups.sh.
> > >
> > > S2. I think the max.warmup.replicas=100 suggested by Nick was intended
> > > as the upper limit for setting the group configuration on the broker.
> > > Just to make sure that this was not a misunderstanding. By default,
> > > values above 100 should be rejected when setting a specific value for
> > > group. Are you suggesting 20 or 30 for the default value for groups,
> > > or the default upper limit for the group configuration?
> > >
> > > S3. Yes, it's supposed to be used like SHUTDOWN_APPLICATION. The
> > > MemberEpoch=-1 is a left-over from an earlier discussion. It means
> > > that the member is leaving the group, so the intention was that the
> > > member must leave the group when it asks the other members to shut
> > > down. We later reconsidered this and decided that all applications
> > > should just react to the shutdown application signal that is returned
> > > by the broker, so the client first sets the ShutdownApplication and
> > > later leaves the group. Thanks for spotting this, I removed it.
> > >
> > > S4. Not sure if this refers to the latest version of the KIP. We added
> > > an extension of the admin API and a kafka-streams-groups.sh
> > > command-line tool to the KIP already.
> > >
> > > S5. All RPCs for dealing with offsets will keep working with streams
> > > groups. The extension of the admin API is rather cosmetic, since the
> > > method names use "consumer group". The RPCs, however, are generic and
> > > do not need to be changed.
> > >
> > > S6. Yes, you can use the DeleteGroup RPC with any group ID, whether
> > > streams group or not.
> > >
> > > S7. See the admin API section.
> > >
> > > S8. I guess for both A and B, I am not sure what you are suggesting.
> > > Do you want to make the broker-side topology immutable and not include
> > > any information about the topology, like the topology ID in the RPC?
> > > It would seem that this would be a massive food-gun for people, if
> > > they start changing their topology and don't notice that the broker is
> > > looking at a completely different version of the topology. Or did you
> > > mean that there is some kind of topology ID, so that at least we can
> > > detect inconsistencies between broker and client-side topologies, and
> > > we fence out any member with an incorrect topology ID? Then we seem to
> > > end up with mostly the same RPCs and the same questions (how is the
> > > topology ID generated?). I agree that the latter could be an option.
> > > See summary at the end of this message.
> > >
> > > S9. If we flat out refuse topology updates, agree - we cannot let the
> > > application crash on minor changes to the topology. However, if we
> > > allow topology updates as described in the KIP, there are only upsides
> > > to making the topology ID more sensitive. All it will cause is that a
> > > client will have to resend a `StreamsGroupInitialize`, and during the
> > > rolling bounce, older clients will not get tasks assigned.
> > >
> > > S10. The intention in the KIP is really just this - old clients can
> > > only retain tasks, but not get new ones. If the topology has
> > > sub-topologies, these will initially be assigned to the new clients,
> > > which also have the new topology. You are right that rolling an
> > > application where the old structure and the new structure are
> > > incompatible (e.g. different subtopologies access the same partition)
> > > will cause problems. But this will also cause problems in the current
> > > protocol, so I'm not sure, if it's strictly a regression, it's just
> > > unsupported (which we can only make the best effort to detect).
> > >
> > > In summary, for the topology ID discussion, I mainly see two options:
> > > 1) stick with the current KIP proposal
> > > 2) define the topology ID as the hash of the StreamsGroupInitialize
> > > topology metadata only, as you suggested, and fence out members with
> > > an incompatible topology ID.
> > > I think doing 2 is also a good option, but in the end it comes down to
> > > how important we believe topology updates (where the set of
> > > sub-topologies or set of internal topics are affected) to be. The main
> > > goal is to make the new protocol a drop-in replacement for the old
> > > protocol, and at least define the RPCs in a way that we won't need
> > > another KIP which bumps the RPC version to make the protocol
> > > production-ready. Adding more command-line tools, or public interfaces
> > > seems less critical as it doesn't change the protocol and can be done
> > > easily later on. The main question becomes - is the protocol
> > > production-ready and sufficient to replace the classic protocol if we
> > > go with 2?
> > >
> > > Cheers,
> > > Lucas
> >

Reply via email to