Hey All,

Nice to see some solid progress on this. It sounds like one of the
complications is allowing static and dynamic registration to coexist. I'm
wondering if we can do something like the following:

1. Statically registered members (those joining the group with a non-null `
member.name`) maintain a session with the coordinator just like dynamic
members.
2. If a session is active for a static member when a rebalance begins, then
basically we'll keep the current behavior. The rebalance will await the
static member joining the group.
3. If a static member does not have an active session, then the coordinator
will not wait for it to join, but will still include it in the rebalance.
The coordinator will forward the cached subscription information to the
leader and will cache the assignment after the rebalance completes. (Note
that we still have the generationId to fence offset commits from a static
zombie if the assignment changes.)
4. When a static member leaves the group or has its session expire, no
rebalance is triggered. Instead, we can begin a timer to expire the static
registration. This would be a longish timeout (like 30 minutes say).

So basically static members participate in all rebalances regardless
whether they have an active session. In a given rebalance, some of the
members may be static and some dynamic. The group leader can differentiate
the two based on the presence of the `member.name` (we have to add this to
the JoinGroupResponse). Generally speaking, we would choose leaders
preferentially from the active members that support the latest JoinGroup
protocol and are using static membership. If we have to choose a leader
with an old version, however, it would see all members in the group (static
or dynamic) as dynamic members and perform the assignment as usual.

Would that work?

-Jason


On Thu, Aug 23, 2018 at 5:26 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Boyang,
>
> Thanks for the updated proposal, a few questions:
>
> 1. Where will "change-group-timeout" be communicated to the broker? Will
> that be a new field in the JoinGroupRequest, or are we going to piggy-back
> on the existing session-timeout field (assuming that the original value
> will not be used anywhere in the static membership any more)?
>
> 2. "However, if the consumer takes longer than session timeout to return,
> we shall still trigger rebalance but it could still try to catch
> `change-group-timeout`.": what does this mean? I thought your proposal is
> that for static memberships, the broker will NOT trigger rebalance even
> after session-timeout has been detected, but only that after
> change-group-timeout
> which is supposed to be longer than session-timeout to be defined?
>
> 3. "A join group request with member.name set will be treated as
> `static-membership` strategy", in this case, how would the switch from
> dynamic to static happen, since whoever changed the member.name to
> not-null
> will be rejected, right?
>
> 4. "just erase the cached mapping, and wait for session timeout to trigger
> rebalance should be sufficient." this is also a bit unclear to me: who will
> erase the cached mapping? Since it is on the broker-side I assume that
> broker has to do it. Are you suggesting to use a new request for it?
>
> 5. "Halfway switch": following 3) above, if your proposal is basically to
> let "first join-request wins", and the strategy will stay as is until all
> members are gone, then this will also not happen since whoever used
> different strategy as the first guy who sends join-group request will be
> rejected right?
>
>
> Guozhang
>
>
> On Tue, Aug 21, 2018 at 9:28 AM, John Roesler <j...@confluent.io> wrote:
>
> > This sounds good to me!
> >
> > Thanks for the time you've spent on it,
> > -John
> >
> > On Tue, Aug 21, 2018 at 12:13 AM Boyang Chen <bche...@outlook.com>
> wrote:
> >
> > > Thanks Matthias for the input. Sorry I was busy recently and haven't
> got
> > > time to update this thread. To summarize what we come up so far, here
> is
> > a
> > > draft updated plan:
> > >
> > >
> > > Introduce a new config called `member.name` which is supposed to be
> > > provided uniquely by the consumer client. The broker will maintain a
> > cache
> > > with [key:member.name, value:member.id]. A join group request with
> > > member.name set will be treated as `static-membership` strategy, and
> > will
> > > reject any join group request without member.name. So this
> coordination
> > > change will be differentiated from the `dynamic-membership` protocol we
> > > currently have.
> > >
> > >
> > > When handling static join group request:
> > >
> > >   1.   The broker will check the membership to see whether this is a
> new
> > > member. If new, broker allocate a unique member id, cache the mapping
> and
> > > move to rebalance stage.
> > >   2.   Following 1, if this is an existing member, broker will not
> change
> > > group state, and return its cached member.id and current assignment.
> > > (unless this is leader, we shall trigger rebalance)
> > >   3.   Although Guozhang has mentioned we could rejoin with pair member
> > > name and id, I think for join group request it is ok to leave member id
> > > blank as member name is the unique identifier. In commit offset request
> > we
> > > *must* have both.
> > >
> > >
> > > When handling commit offset request, if enabled with static membership,
> > > each time the commit request must have both member.name and member.id
> to
> > > be identified as a `certificated member`. If not, this means there are
> > > duplicate consumer members with same member name and the request will
> be
> > > rejected to guarantee consumption uniqueness.
> > >
> > >
> > > When rolling restart/shutting down gracefully, the client will send a
> > > leave group request (static membership mode). In static membership, we
> > will
> > > also define `change-group-timeout` to hold on rebalance provided by
> > leader.
> > > So we will wait for all the members to rejoin the group and do exactly
> > one
> > > rebalance since all members are expected to rejoin within timeout. If
> > > consumer crashes, the join group request from the restarted consumer
> will
> > > be recognized as an existing member and be handled as above condition
> 1;
> > > However, if the consumer takes longer than session timeout to return,
> we
> > > shall still trigger rebalance but it could still try to catch
> > > `change-group-timeout`. If it failed to catch second timeout, its
> cached
> > > state on broker will be garbage collected and trigger a new rebalance
> > when
> > > it finally joins.
> > >
> > >
> > > And consider the switch between dynamic to static membership.
> > >
> > >   1.  Dynamic to static: the first joiner shall revise the membership
> to
> > > static and wait for all the current members to restart, since their
> > > membership is still dynamic. Here our assumption is that the restart
> > > process shouldn't take a long time, as long restart is breaking the
> > > `rebalance timeout` in whatever membership protocol we are using.
> Before
> > > restart, all dynamic member join requests will be rejected.
> > >   2.  Static to dynamic: this is more like a downgrade which should be
> > > smooth: just erase the cached mapping, and wait for session timeout to
> > > trigger rebalance should be sufficient. (Fallback to current behavior)
> > >   3.  Halfway switch: a corner case is like some clients keep dynamic
> > > membership while some keep static membership. This will cause the group
> > > rebalance forever without progress because dynamic/static states are
> > > bouncing each other. This could guarantee that we will not make the
> > > consumer group work in a wrong state by having half static and half
> > dynamic.
> > >
> > > To guarantee correctness, we will also push the member name/id pair to
> > > _consumed_offsets topic (as Matthias pointed out) and upgrade the API
> > > version, these details will be further discussed back in the KIP.
> > >
> > >
> > > Are there any concern for this high level proposal? Just want to
> > reiterate
> > > on the core idea of the KIP: "If the broker recognize this consumer as
> an
> > > existing member, it shouldn't trigger rebalance".
> > >
> > > Thanks a lot for everyone's input! I feel this proposal is much more
> > > robust than previous one!
> > >
> > >
> > > Best,
> > >
> > > Boyang
> > >
> > > ________________________________
> > > From: Matthias J. Sax <matth...@confluent.io>
> > > Sent: Friday, August 10, 2018 2:24 AM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > > specifying member id
> > >
> > > Hi,
> > >
> > > thanks for the detailed discussion. I learned a lot about internals
> again
> > > :)
> > >
> > > I like the idea or a user config `member.name` and to keep `member.id`
> > > internal. Also agree with Guozhang, that reusing `client.id` might not
> > > be a good idea.
> > >
> > > To clarify the algorithm, each time we generate a new `member.id`, we
> > > also need to update the "group membership" information (ie, mapping
> > > [member.id, Assignment]), right? Ie, the new `member.id` replaces the
> > > old entry in the cache.
> > >
> > > I also think, we need to preserve the `member.name -> member.id`
> mapping
> > > in the `__consumer_offset` topic. The KIP should mention this IMHO.
> > >
> > > For changing the default value of config `leave.group.on.close`. I
> agree
> > > with John, that we should not change the default config, because it
> > > would impact all consumer groups with dynamic assignment. However, I
> > > think we can document, that if static assignment is used (ie,
> > > `member.name` is configured) we never send a LeaveGroupRequest
> > > regardless of the config. Note, that the config is internal, so not
> sure
> > > how to document this in detail. We should not expose the internal
> config
> > > in the docs.
> > >
> > > About upgrading: why do we need have two rolling bounces and encode
> > > "static" vs "dynamic" in the JoinGroupRequest?
> > >
> > > If we upgrade an existing consumer group from dynamic to static, I
> don't
> > > see any reason why both should not work together and single rolling
> > > bounce would not be sufficient? If we bounce the first consumer and
> > > switch from dynamic to static, it sends a `member.name` and the broker
> > > registers the [member.name, member.id] in the cache. Why would this
> > > interfere with all other consumer that use dynamic assignment?
> > >
> > > Also, Guozhang mentioned that for all other request, we need to check
> if
> > > the mapping [member.name, member.id] contains the send `member.id` --
> I
> > > don't think this is necessary -- it seems to be sufficient to check the
> > > `member.id` from the [member.id, Assignment] mapping as be do today --
> > > thus, checking `member.id` does not require any change IMHO.
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 8/7/18 7:13 PM, Guozhang Wang wrote:
> > > > @James
> > > >
> > > > What you described is true: the transition from dynamic to static
> > > > memberships are not thought through yet. But I do not think it is an
> > > > impossible problem: note that we indeed moved the offset commit from
> ZK
> > > to
> > > > kafka coordinator in 0.8.2 :) The migration plan is to first to
> > > > double-commits on both zk and coordinator, and then do a second round
> > to
> > > > turn the zk off.
> > > >
> > > > So just to throw a wild idea here: also following a
> two-rolling-bounce
> > > > manner, in the JoinGroupRequest we can set the flag to "static" while
> > > keep
> > > > the registry-id field empty still, in this case, the coordinator
> still
> > > > follows the logic of "dynamic", accepting the request while allowing
> > the
> > > > protocol to be set to "static"; after the first rolling bounce, the
> > group
> > > > protocol is already "static", then a second rolling bounce is
> triggered
> > > and
> > > > this time we set the registry-id.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Tue, Aug 7, 2018 at 1:19 AM, James Cheng <wushuja...@gmail.com>
> > > wrote:
> > > >
> > > >> Guozhang, in a previous message, you proposed said this:
> > > >>
> > > >>> On Jul 30, 2018, at 3:56 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >>>
> > > >>> 1. We bump up the JoinGroupRequest with additional fields:
> > > >>>
> > > >>>  1.a) a flag indicating "static" or "dynamic" membership protocols.
> > > >>>  1.b) with "static" membership, we also add the pre-defined member
> > id.
> > > >>>  1.c) with "static" membership, we also add an optional
> > > >>> "group-change-timeout" value.
> > > >>>
> > > >>> 2. On the broker side, we enforce only one of the two protocols for
> > all
> > > >>> group members: we accept the protocol on the first joined member of
> > the
> > > >>> group, and if later joining members indicate a different membership
> > > >>> protocol, we reject it. If the group-change-timeout value was
> > different
> > > >> to
> > > >>> the first joined member, we reject it as well.
> > > >>
> > > >>
> > > >> What will happen if we have an already-deployed application that
> wants
> > > to
> > > >> switch to using static membership? Let’s say there are 10 instances
> of
> > > it.
> > > >> As the instances go through a rolling restart, they will switch from
> > > >> dynamic membership (the default?) to static membership. As each one
> > > leaves
> > > >> the group and restarts, they will be rejected from the group
> (because
> > > the
> > > >> group is currently using dynamic membership). The group will shrink
> > down
> > > >> until there is 1 node handling all the traffic. After that one
> > restarts,
> > > >> the group will switch over to static membership.
> > > >>
> > > >> Is that right? That means that the transition plan from dynamic to
> > > static
> > > >> membership isn’t very smooth.
> > > >>
> > > >> I’m not really sure what can be done in this case. This reminds me
> of
> > > the
> > > >> transition plans that were discussed for moving from zookeeper-based
> > > >> consumers to kafka-coordinator-based consumers. That was also hard,
> > and
> > > >> ultimately we decided not to build that.
> > > >>
> > > >> -James
> > > >>
> > > >>
> > > >
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to