Hey Jason, I like your idea to simplify the upgrade protocol to allow co-exist of static and dynamic members. Admittedly it may make the coordinator-side logic a bit more complex, but I think it worth doing it.
Originally I was trying to kill more birds with one stone with KIP-345, e.g. to fix the multi-rebalance issue on starting up / shutting down a multi-instance client (mentioned as case 1)/2) in my early email), and hence proposing to have a pure static-membership protocol. But thinking twice about it I now feel it may be too ambitious and worth fixing in another KIP. With that, I think what you've proposed here is a good way to go for KIP-345 itself. Note there are a few details in your proposal we'd still need to figure out: 1. How this longish static member expiration timeout defined? Is it via a broker, hence global config, or via a client config which can be communicated to broker via JoinGroupRequest? 2. Assuming that for static members, LEAVE_GROUP request will not trigger a rebalance immediately either, similar to session timeout, but only the longer member expiration timeout, can we remove the internal " internal.leave.group.on.close" config, which is a quick walk-around then? Guozhang On Fri, Aug 24, 2018 at 11:14 AM, Jason Gustafson <ja...@confluent.io> wrote: > Hey All, > > Nice to see some solid progress on this. It sounds like one of the > complications is allowing static and dynamic registration to coexist. I'm > wondering if we can do something like the following: > > 1. Statically registered members (those joining the group with a non-null ` > member.name`) maintain a session with the coordinator just like dynamic > members. > 2. If a session is active for a static member when a rebalance begins, then > basically we'll keep the current behavior. The rebalance will await the > static member joining the group. > 3. If a static member does not have an active session, then the coordinator > will not wait for it to join, but will still include it in the rebalance. > The coordinator will forward the cached subscription information to the > leader and will cache the assignment after the rebalance completes. (Note > that we still have the generationId to fence offset commits from a static > zombie if the assignment changes.) > 4. When a static member leaves the group or has its session expire, no > rebalance is triggered. Instead, we can begin a timer to expire the static > registration. This would be a longish timeout (like 30 minutes say). > > So basically static members participate in all rebalances regardless > whether they have an active session. In a given rebalance, some of the > members may be static and some dynamic. The group leader can differentiate > the two based on the presence of the `member.name` (we have to add this to > the JoinGroupResponse). Generally speaking, we would choose leaders > preferentially from the active members that support the latest JoinGroup > protocol and are using static membership. If we have to choose a leader > with an old version, however, it would see all members in the group (static > or dynamic) as dynamic members and perform the assignment as usual. > > Would that work? > > -Jason > > > On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Boyang, > > > > Thanks for the updated proposal, a few questions: > > > > 1. Where will "change-group-timeout" be communicated to the broker? Will > > that be a new field in the JoinGroupRequest, or are we going to > piggy-back > > on the existing session-timeout field (assuming that the original value > > will not be used anywhere in the static membership any more)? > > > > 2. "However, if the consumer takes longer than session timeout to return, > > we shall still trigger rebalance but it could still try to catch > > `change-group-timeout`.": what does this mean? I thought your proposal is > > that for static memberships, the broker will NOT trigger rebalance even > > after session-timeout has been detected, but only that after > > change-group-timeout > > which is supposed to be longer than session-timeout to be defined? > > > > 3. "A join group request with member.name set will be treated as > > `static-membership` strategy", in this case, how would the switch from > > dynamic to static happen, since whoever changed the member.name to > > not-null > > will be rejected, right? > > > > 4. "just erase the cached mapping, and wait for session timeout to > trigger > > rebalance should be sufficient." this is also a bit unclear to me: who > will > > erase the cached mapping? Since it is on the broker-side I assume that > > broker has to do it. Are you suggesting to use a new request for it? > > > > 5. "Halfway switch": following 3) above, if your proposal is basically to > > let "first join-request wins", and the strategy will stay as is until all > > members are gone, then this will also not happen since whoever used > > different strategy as the first guy who sends join-group request will be > > rejected right? > > > > > > Guozhang > > > > > > On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <j...@confluent.io> wrote: > > > > > This sounds good to me! > > > > > > Thanks for the time you've spent on it, > > > -John > > > > > > On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bche...@outlook.com> > > wrote: > > > > > > > Thanks Matthias for the input. Sorry I was busy recently and haven't > > got > > > > time to update this thread. To summarize what we come up so far, here > > is > > > a > > > > draft updated plan: > > > > > > > > > > > > Introduce a new config called `member.name` which is supposed to be > > > > provided uniquely by the consumer client. The broker will maintain a > > > cache > > > > with [key:member.name, value:member.id]. A join group request with > > > > member.name set will be treated as `static-membership` strategy, and > > > will > > > > reject any join group request without member.name. So this > > coordination > > > > change will be differentiated from the `dynamic-membership` protocol > we > > > > currently have. > > > > > > > > > > > > When handling static join group request: > > > > > > > > 1. The broker will check the membership to see whether this is a > > new > > > > member. If new, broker allocate a unique member id, cache the mapping > > and > > > > move to rebalance stage. > > > > 2. Following 1, if this is an existing member, broker will not > > change > > > > group state, and return its cached member.id and current assignment. > > > > (unless this is leader, we shall trigger rebalance) > > > > 3. Although Guozhang has mentioned we could rejoin with pair > member > > > > name and id, I think for join group request it is ok to leave member > id > > > > blank as member name is the unique identifier. In commit offset > request > > > we > > > > *must* have both. > > > > > > > > > > > > When handling commit offset request, if enabled with static > membership, > > > > each time the commit request must have both member.name and > member.id > > to > > > > be identified as a `certificated member`. If not, this means there > are > > > > duplicate consumer members with same member name and the request will > > be > > > > rejected to guarantee consumption uniqueness. > > > > > > > > > > > > When rolling restart/shutting down gracefully, the client will send a > > > > leave group request (static membership mode). In static membership, > we > > > will > > > > also define `change-group-timeout` to hold on rebalance provided by > > > leader. > > > > So we will wait for all the members to rejoin the group and do > exactly > > > one > > > > rebalance since all members are expected to rejoin within timeout. If > > > > consumer crashes, the join group request from the restarted consumer > > will > > > > be recognized as an existing member and be handled as above condition > > 1; > > > > However, if the consumer takes longer than session timeout to return, > > we > > > > shall still trigger rebalance but it could still try to catch > > > > `change-group-timeout`. If it failed to catch second timeout, its > > cached > > > > state on broker will be garbage collected and trigger a new rebalance > > > when > > > > it finally joins. > > > > > > > > > > > > And consider the switch between dynamic to static membership. > > > > > > > > 1. Dynamic to static: the first joiner shall revise the membership > > to > > > > static and wait for all the current members to restart, since their > > > > membership is still dynamic. Here our assumption is that the restart > > > > process shouldn't take a long time, as long restart is breaking the > > > > `rebalance timeout` in whatever membership protocol we are using. > > Before > > > > restart, all dynamic member join requests will be rejected. > > > > 2. Static to dynamic: this is more like a downgrade which should > be > > > > smooth: just erase the cached mapping, and wait for session timeout > to > > > > trigger rebalance should be sufficient. (Fallback to current > behavior) > > > > 3. Halfway switch: a corner case is like some clients keep dynamic > > > > membership while some keep static membership. This will cause the > group > > > > rebalance forever without progress because dynamic/static states are > > > > bouncing each other. This could guarantee that we will not make the > > > > consumer group work in a wrong state by having half static and half > > > dynamic. > > > > > > > > To guarantee correctness, we will also push the member name/id pair > to > > > > _consumed_offsets topic (as Matthias pointed out) and upgrade the API > > > > version, these details will be further discussed back in the KIP. > > > > > > > > > > > > Are there any concern for this high level proposal? Just want to > > > reiterate > > > > on the core idea of the KIP: "If the broker recognize this consumer > as > > an > > > > existing member, it shouldn't trigger rebalance". > > > > > > > > Thanks a lot for everyone's input! I feel this proposal is much more > > > > robust than previous one! > > > > > > > > > > > > Best, > > > > > > > > Boyang > > > > > > > > ________________________________ > > > > From: Matthias J. Sax <matth...@confluent.io> > > > > Sent: Friday, August 10, 2018 2:24 AM > > > > To: dev@kafka.apache.org > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances > by > > > > specifying member id > > > > > > > > Hi, > > > > > > > > thanks for the detailed discussion. I learned a lot about internals > > again > > > > :) > > > > > > > > I like the idea or a user config `member.name` and to keep ` > member.id` > > > > internal. Also agree with Guozhang, that reusing `client.id` might > not > > > > be a good idea. > > > > > > > > To clarify the algorithm, each time we generate a new `member.id`, > we > > > > also need to update the "group membership" information (ie, mapping > > > > [member.id, Assignment]), right? Ie, the new `member.id` replaces > the > > > > old entry in the cache. > > > > > > > > I also think, we need to preserve the `member.name -> member.id` > > mapping > > > > in the `__consumer_offset` topic. The KIP should mention this IMHO. > > > > > > > > For changing the default value of config `leave.group.on.close`. I > > agree > > > > with John, that we should not change the default config, because it > > > > would impact all consumer groups with dynamic assignment. However, I > > > > think we can document, that if static assignment is used (ie, > > > > `member.name` is configured) we never send a LeaveGroupRequest > > > > regardless of the config. Note, that the config is internal, so not > > sure > > > > how to document this in detail. We should not expose the internal > > config > > > > in the docs. > > > > > > > > About upgrading: why do we need have two rolling bounces and encode > > > > "static" vs "dynamic" in the JoinGroupRequest? > > > > > > > > If we upgrade an existing consumer group from dynamic to static, I > > don't > > > > see any reason why both should not work together and single rolling > > > > bounce would not be sufficient? If we bounce the first consumer and > > > > switch from dynamic to static, it sends a `member.name` and the > broker > > > > registers the [member.name, member.id] in the cache. Why would this > > > > interfere with all other consumer that use dynamic assignment? > > > > > > > > Also, Guozhang mentioned that for all other request, we need to check > > if > > > > the mapping [member.name, member.id] contains the send `member.id` > -- > > I > > > > don't think this is necessary -- it seems to be sufficient to check > the > > > > `member.id` from the [member.id, Assignment] mapping as be do today > -- > > > > thus, checking `member.id` does not require any change IMHO. > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 8/7/18 7:13 PM, Guozhang Wang wrote: > > > > > @James > > > > > > > > > > What you described is true: the transition from dynamic to static > > > > > memberships are not thought through yet. But I do not think it is > an > > > > > impossible problem: note that we indeed moved the offset commit > from > > ZK > > > > to > > > > > kafka coordinator in 0.8.2 :) The migration plan is to first to > > > > > double-commits on both zk and coordinator, and then do a second > round > > > to > > > > > turn the zk off. > > > > > > > > > > So just to throw a wild idea here: also following a > > two-rolling-bounce > > > > > manner, in the JoinGroupRequest we can set the flag to "static" > while > > > > keep > > > > > the registry-id field empty still, in this case, the coordinator > > still > > > > > follows the logic of "dynamic", accepting the request while > allowing > > > the > > > > > protocol to be set to "static"; after the first rolling bounce, the > > > group > > > > > protocol is already "static", then a second rolling bounce is > > triggered > > > > and > > > > > this time we set the registry-id. > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <wushuja...@gmail.com> > > > > wrote: > > > > > > > > > >> Guozhang, in a previous message, you proposed said this: > > > > >> > > > > >>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > >>> > > > > >>> 1. We bump up the JoinGroupRequest with additional fields: > > > > >>> > > > > >>> 1.a) a flag indicating "static" or "dynamic" membership > protocols. > > > > >>> 1.b) with "static" membership, we also add the pre-defined > member > > > id. > > > > >>> 1.c) with "static" membership, we also add an optional > > > > >>> "group-change-timeout" value. > > > > >>> > > > > >>> 2. On the broker side, we enforce only one of the two protocols > for > > > all > > > > >>> group members: we accept the protocol on the first joined member > of > > > the > > > > >>> group, and if later joining members indicate a different > membership > > > > >>> protocol, we reject it. If the group-change-timeout value was > > > different > > > > >> to > > > > >>> the first joined member, we reject it as well. > > > > >> > > > > >> > > > > >> What will happen if we have an already-deployed application that > > wants > > > > to > > > > >> switch to using static membership? Let’s say there are 10 > instances > > of > > > > it. > > > > >> As the instances go through a rolling restart, they will switch > from > > > > >> dynamic membership (the default?) to static membership. As each > one > > > > leaves > > > > >> the group and restarts, they will be rejected from the group > > (because > > > > the > > > > >> group is currently using dynamic membership). The group will > shrink > > > down > > > > >> until there is 1 node handling all the traffic. After that one > > > restarts, > > > > >> the group will switch over to static membership. > > > > >> > > > > >> Is that right? That means that the transition plan from dynamic to > > > > static > > > > >> membership isn’t very smooth. > > > > >> > > > > >> I’m not really sure what can be done in this case. This reminds me > > of > > > > the > > > > >> transition plans that were discussed for moving from > zookeeper-based > > > > >> consumers to kafka-coordinator-based consumers. That was also > hard, > > > and > > > > >> ultimately we decided not to build that. > > > > >> > > > > >> -James > > > > >> > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang