@Jason,

Good point about disconnects. And with that I think I agree that a registry
id maybe a better idea to enable fencing than validating on host / port.


Guozhang


On Mon, Jul 30, 2018 at 5:40 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Guozhang,
>
> Thanks for the detailed response. Really quick about the fencing issue, I
> think host/port will not be sufficient because it cannot handle
> disconnects. For example, if the coordinator moves to another broker, then
> there is no way we'd be able to guarantee the same host/port. Currently we
> try to avoid rebalancing when the coordinator moves. That said, I agree in
> principle with the "first comer wins" approach you've suggested. Basically
> a member is only removed from the group if its session expires or it leaves
> the group explicitly.
>
> -Jason
>
> On Mon, Jul 30, 2018 at 4:24 PM, Mike Freyberger <mfreyber...@appnexus.com
> >
> wrote:
>
> > Guozhang,
> >
> > Thanks for giving us a great starting point.
> >
> > A few questions that come to mind right away:
> >
> > 1) What do you think a reasonable group-change-timeout would be? I am
> > thinking on the order of minutes (5 minutes?)
> >
> > 2) Will the nodes that are still alive continue to make progress during a
> > static membership rebalance? I believe during a rebalance today all
> > consumers wait for the SyncGroupResponse before continuing to read data
> > from the brokers. If that is the case, I think it'd be ideal all nodes
> that
> > are still alive during a static group membership change to continue to
> make
> > progress as if nothing happened such that are there is no impact to the
> > majority of the group when one node is bounced (quick restart).
> >
> > 3) Do you think an explicit method for forcing a rebalance would be
> > needed? I am thinking of a scenario such as a disk failure on a node, and
> > that node will definitely not come back. Rather than waiting up to the
> > group-change-timeout, I think it'd be good an admin to force a rebalance
> > rather than wait the full group-change-timeout. Maybe this is an over
> > optimization, but I think certain use cases would benefit from static
> group
> > membership with the ability to force a rebalance at any time.
> >
> > Best,
> >
> > Mike
> >
> > On 7/30/18, 6:57 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> >
> >     Hello Boyang / Jason / Mike,
> >
> >     Thanks for your thoughtful inputs! Regarding the fencing issue, I've
> >     thought about leveraging the epoch notion from PID of transactional
> >     messaging before, but since in this proposal we do not always require
> >     member ids from clients, and hence could have a mixed of
> user-specified
> >     member ids with coordinator-generated member ids, the epoch idea may
> > not be
> >     very well suited for this scenario. Of course, we can tighten the
> > screws a
> >     bit by requiring that for a given consumer group, all consumers must
> > either
> >     be giving their member ids or leveraging on consumer coordinator to
> > give
> >     member ids, which does not sound a very strict requirement in
> > practice, and
> >     all we need to do is the add a new field in the join group request
> (we
> > are
> >     proposing to bump up its version anyways). And hence I've also
> thought
> >     about another simple fencing approach, aka "first comer wins", that
> is
> > to
> >     pass in the host / port information from KafkaApis to
> GroupCoordinator
> > to
> >     validate if it matches the existing member id's cached host / post.
> It
> > does
> >     not always guarantee that we fence the right zombies because of
> "first
> >     comer wins" (think of a scenario where the right client gets kicked
> > out,
> >     and then before it re-joins the actual zombie with the same member id
> > gets
> >     joined), but as I mentioned previously it will poke some leaks into
> the
> >     code hierarchy a bit so I'm also hesitant to do it. If people think
> it
> > is
> >     indeed a must-have than good-to-have, I'd suggest we leverage on
> > host-port
> >     than using the epoch mechanism then.
> >
> >     ------------------------------------------
> >
> >     As for the more general idea of having a static membership protocol
> to
> >     better integrated with Cloud environment like k8s, I think the first
> > idea
> >     may actually be better fit with it.
> >
> >     Just a quick summary of what rebalance issues we face today:
> >
> >     1. Application start: when multi-instance application is started,
> > multiple
> >     rebalances are triggered to migrate states to newly started instances
> > since
> >     not all instances are joining at the same time. NOTE that KIP-134
> >     <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 134%3A+Delay+initial+consumer+group+rebalance>
> >     is
> >     targeted for this issue, but as an after-thought it may not be the
> > optimal
> >     solution.
> >     2. Application shutdown: similarly to 1), when multi-instance
> > application
> >     is shutting down, multiple rebalances are triggered.
> >     3. Application scale out: when a new instance is started, one
> > rebalance is
> >     executed to shuffle all assignment, rather than just a "partial"
> > shuffling
> >     of some of the members.
> >     4. Application scale in: similarly to 3), when an existing instance
> >     gracefully shutdown, once rebalance is executed to shuffle all
> > assignment.
> >     5. Application instance bounce (upgrade, config change etc): one
> > instance
> >     shut down and then restart, it will trigger two rebalances. NOTE that
> >     disabling leave-group is targeted for this issue.
> >     6. Application instance failure: one instance failed, and probably a
> > new
> >     instance start to take its assignment (e.g. k8s), it will trigger two
> >     rebalances. The different with 3) above is that new instance would
> not
> > have
> >     local cached tasks.
> >
> >
> >     Among them, I think 1/2/5/6 could potentially be grouped together as
> >     "static membership"; 4/5 could be grouped as another category, of
> > allowing
> >     "incremental rebalance" or "partial rebalance" than full-rebalance.
> Of
> >     course, having incremental rebalances can help on 1/2/5/6 as well to
> > reduce
> >     the cost of each unnecessary rebalance, but ideally we want NO
> > rebalances
> >     at all for these cases, which will be more true with k8s / etc
> > integrations
> >     or static memberships.
> >
> >     ------------------------------------------
> >
> >     So just to throw in a sketchy idea following this route for 1/2/5/6
> for
> >     brainstorming kick-off:
> >
> >
> >     1. We bump up the JoinGroupRequest with additional fields:
> >
> >       1.a) a flag indicating "static" or "dynamic" membership protocols.
> >       1.b) with "static" membership, we also add the pre-defined member
> id.
> >       1.c) with "static" membership, we also add an optional
> >     "group-change-timeout" value.
> >
> >     2. On the broker side, we enforce only one of the two protocols for
> all
> >     group members: we accept the protocol on the first joined member of
> the
> >     group, and if later joining members indicate a different membership
> >     protocol, we reject it. If the group-change-timeout value was
> > different to
> >     the first joined member, we reject it as well.
> >
> >     3. With dynamic membership, nothing is changed; with static
> > membership, we
> >     do the following:
> >
> >       3.a) never assign member ids, instead always expect the joining
> > members
> >     to come with their own member id; we could do the fencing based on
> > host /
> >     port here.
> >       3.b) upon receiving the first join group request, use the
> >     "group-change-timeout" instead of the session-timeout as rebalance
> > timeout
> >     to wait for other members to join. This is for 1) above.
> >       3.c) upon receiving a leave-group request, use the
> > "group-change-timeout"
> >     to wait for more members to leave group as well, or for the left
> > members to
> >     re-join. After the timeout we trigger a rebalance with whatever have
> > left
> >     in the members list. This is for all 2 (expecting other members to
> send
> >     leave-group) and 5/6 (expecting the left member to re-join).
> >
> >     4. As a result, we will deprecate KIP-134 and disable-on-leave-group
> as
> >     well.
> >
> >
> >     The key idea is that, with "static" membership, groups should be
> > created or
> >     terminated as a whole, and dynamic member changes are not expected
> > often.
> >     Hence we would not react to those membership-changing events
> > immediately
> >     but wait for longer specified time expecting some other systems like
> > k8s
> >     will resume the group members. WDYT?
> >
> >
> >     Guozhang
> >
> >
> >     On Mon, Jul 30, 2018 at 3:05 PM, Mike Freyberger <
> > mfreyber...@appnexus.com>
> >     wrote:
> >
> >     > Jason,
> >     >
> >     > I really appreciate the broader conversation that you are bringing
> > up here.
> >     >
> >     > I've been working on an application that does streaming joins for a
> > while
> >     > now, and we face a similar issue with group membership being
> > dynamic. We
> >     > are currently using our own StickyAssignor and take special care
> > during
> >     > rolling restarts to make sure consumer assignments do not change.
> >     >
> >     > I think a feature that allows for group membership to be fixed,
> > along with
> >     > a CLI for adding or removing a node from the group be ideal. This
> > reminds
> >     > me of some of the work by the DynamoDB team about 10 years back
> when
> > they
> >     > differentiated transient failures from permanent failures to deal
> > with this
> >     > problems like this.
> >     >
> >     > Best,
> >     >
> >     > Mike
> >     >
> >     > On 7/30/18, 5:36 PM, "Jason Gustafson" <ja...@confluent.io> wrote:
> >     >
> >     >     Hi Boyang,
> >     >
> >     >     Thanks for the response. I think the main point I was trying to
> > make
> >     > is the
> >     >     need for fencing. I am not too concerned about how to generate
> a
> >     > unique id
> >     >     on the client side. The approach you suggested for streams
> seems
> >     >     reasonable. However, any time you reuse an id, you need to be
> > careful
> >     > that
> >     >     there is only one instance that can use it at any time. We are
> > always
> >     >     running into problems where a previous instance of an
> > application comes
> >     >     back to life unexpectedly after we had already presumed it was
> > dead.
> >     >     Fencing ensures that even if this happens, it cannot do any
> > damage. I
> >     > would
> >     >     say that some protection from zombies is a requirement here.
> >     >
> >     >     The second point was more abstract and mainly meant to initiate
> > some
> >     >     discussion. We have gone through several iterations of
> > improvements to
> >     > try
> >     >     and reduce the rebalancing in consumer applications. We started
> > out
> >     > trying
> >     >     to tune the session timeout. We have added an internal config
> to
> > skip
> >     >     leaving the group when streams shuts down. The broker now has a
> > config
> >     > to
> >     >     delay rebalances in case all consumers join at about the same
> > time. The
> >     >     approach in this KIP is a step in a more principled direction,
> > but it
> >     > still
> >     >     feels like we are making this unnecessarily hard on ourselves
> by
> >     > insisting
> >     >     that group membership is a dynamic concept. In practice, the
> > number of
> >     >     nodes dedicated to an application tends to remain fixed for
> long
> >     > periods of
> >     >     time and only scales up or down when needed. And these days
> > you've got
> >     >     frameworks like kubernetes which can automatically provision
> new
> > nodes
> >     > if
> >     >     one fails. So the argument for dynamic membership is becoming
> > weaker
> >     > in my
> >     >     opinion. This KIP is basically trying to impose a small degree
> of
> >     > static
> >     >     membership anyway so that rolling restarts do not change
> > membership.
> >     >     Anyway, curious to hear some thoughts about this from you and
> the
> >     > others
> >     >     who work on streams.
> >     >
> >     >     Thanks,
> >     >     Jason
> >     >
> >     >
> >     >     On Sat, Jul 28, 2018 at 4:44 PM, Boyang Chen <
> > bche...@outlook.com>
> >     > wrote:
> >     >
> >     >     > Thanks for the replies, James and Jason. Let me try to
> > summarize your
> >     >     > concerns.
> >     >     >
> >     >     >
> >     >     > I think James' question is primarily the severity of user
> > using this
> >     >     > config wrongly. The impact would be that the same member id
> > being
> >     > used by
> >     >     > multiple or even all of the consumers. The assignment
> protocol
> >     > couldn't
> >     >     > distinguish any of the overlapping consumers, thus assigning
> > the
> >     > exact same
> >     >     > partitions multiple times to different consumers. I would say
> > the
> >     > processed
> >     >     > result would be including a lot of duplicates and unnecessary
> > heavy
> >     > load on
> >     >     > the client side, The correctness will depend on the user
> logic,
> >     > however I'm
> >     >     > pessimistic.
> >     >     >
> >     >     >
> >     >     > Although the impact is very high, the symptom is not hard to
> > triage,
> >     >     > because user could visualize consumer identity overlapping
> > fairly
> >     > easily by
> >     >     > exported consumer metrics. On the user standpoint, they would
> > be
> >     > fully
> >     >     > aware of the potential erratic status before enabling "
> > member.id"
> >     >     > configuration IMO. Let me know your thoughts James!
> >     >     >
> >     >     >
> >     >     > Next is Jason's suggestion. Jason shared a higher viewpoint
> and
> >     > pointed
> >     >     > out the problem that we need to solve is to maintain "a
> strong
> > bias
> >     > towards
> >     >     > being able to reuse previous state". The proposed approach is
> > to
> >     > separate
> >     >     > the notion of consumer membership and consumer identity.
> >     >     >
> >     >     >
> >     >     > The original idea of this KIP was on the Stream application,
> > so I
> >     >     > understand that the identities of multiple consumers belong
> to
> > one
> >     >     > instance, where each Stream thread will be using one
> dedicated
> > main
> >     >     > consumer. So in a Stream use case, we could internally
> generate
> >     > member id
> >     >     > with USER_DEFINED_ID + STREAM_THREAD_ID.
> >     >     >
> >     >     >
> >     >     > In pure consumer use case, this could be a little bit
> > challenging
> >     > since
> >     >     > user could arbitrarily initiate multiple consumers on the
> same
> >     > instance
> >     >     > which is out of our library control. This could add up the
> >     > possibility of
> >     >     > member id collision. So instead of making developers life
> > easier,
> >     >     > introducing member id config could break the existing code
> > logic and
> >     > take
> >     >     > long time to understand and fix. Although I still assume this
> > is an
> >     >     > advanced config, user may use member id config even before
> > they fully
> >     >     > understand the problem, and use the same set of
> initialization
> > logic
> >     > cross
> >     >     > multiple consumers on the same instance.
> >     >     >
> >     >     >
> >     >     > I hope I have explained my understanding of the pros and cons
> > of
> >     > this KIP
> >     >     > better. Remember the core argument of this KIP: If the broker
> >     > recognize
> >     >     > this consumer as an existing member, it shouldn't trigger
> > rebalance.
> >     > If we
> >     >     > build our discussion on top of this argument, the client
> > management
> >     > of
> >     >     > group membership could be tricky at first, but considering
> our
> >     > original
> >     >     > motivation to leader-follower rebalance model, I feel that
> > having
> >     > broker to
> >     >     > create membership info and let client maintain would be less
> >     > appealing and
> >     >     > fragile. Having client generate membership data could build
> up
> >     >     > source-of-truth model and streamline the current
> architecture.
> > We
> >     > need also
> >     >     > consider flexibility introduced by this KIP for cloud users
> to
> >     > coordinate
> >     >     > consumer/stream instances more freely. Honestly, I'm
> > interested in
> >     > Jason's
> >     >     > registration id proposal and open to more voices, but I feel
> it
> >     > would be
> >     >     > more complex than the current KIP for user to understand.
> Hope
> > this
> >     > makes
> >     >     > sense, Jason.
> >     >     >
> >     >     >
> >     >     > Thank you again for the feedback!
> >     >     >
> >     >     >
> >     >     > Best,
> >     >     >
> >     >     > Boyang
> >     >     >
> >     >     >
> >     >     > ________________________________
> >     >     > From: Jason Gustafson <ja...@confluent.io>
> >     >     > Sent: Saturday, July 28, 2018 6:50 AM
> >     >     > To: dev
> >     >     > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > rebalances
> >     > by
> >     >     > specifying member id
> >     >     >
> >     >     > Hey Boyang,
> >     >     >
> >     >     > Thanks for the KIP. I think my main question is in the same
> > vein as
> >     > James'.
> >     >     > The problem is that the coordinator needs to be able to
> > identify
> >     > which
> >     >     > instance of a particular memberId is the active one. For EOS,
> > each
> >     >     > transactionalId gets an epoch. When a new producer is
> started,
> > it
> >     > bumps the
> >     >     > epoch which allows the transaction coordinator to fence off
> any
> >     > zombie
> >     >     > instances which may try to continue doing work with the old
> > epoch.
> >     > It seems
> >     >     > like we need a similar protection for consumer members.
> >     >     >
> >     >     > Suppose for example that we distinguish between a
> registration
> > id
> >     > which is
> >     >     > provided by the user and a member id which is assigned
> > uniquely by
> >     > the
> >     >     > coordinator. In the JoinGroup request, both the registration
> > id and
> >     > the
> >     >     > member id are provided. When a consumer is first started, it
> > doesn't
> >     > know
> >     >     > the memberId, so it it provides only the registration id. The
> >     > coordinator
> >     >     > can then assign a new memberId and invalidate the previous
> one
> > that
> >     > was
> >     >     > associated with the registration id. This would then fence
> off
> > the
> >     > previous
> >     >     > instance which was still trying to use the member id.
> >     >     >
> >     >     > Taking a little bit of a step back, I think the main
> > observation in
> >     > this
> >     >     > KIP is that applications with heavy local state need to have
> a
> >     > strong bias
> >     >     > toward being able to reuse that state. It is a bit like Kafka
> > itself
> >     > in the
> >     >     > sense that a replica is not moved just because the broker is
> >     > shutdown as
> >     >     > the cost of moving the log is extremely high. I'm wondering
> if
> > we
> >     > need to
> >     >     > think about streams applications in a similar way. Should
> > there be a
> >     > static
> >     >     > notion of the members of the group so that streams can make
> >     > rebalancing
> >     >     > decisions more easily without depending so heavily on
> transient
> >     > membership?
> >     >     > I feel the hacks we've put in place in some cases to avoid
> >     > rebalances are a
> >     >     > bit brittle. Delaying group joining for example is an example
> > of
> >     > this. If
> >     >     > you knew ahead of time who the stable members of the group
> > were,
> >     > then this
> >     >     > would not be needed. Anyway, just a thought.
> >     >     >
> >     >     > Thanks,
> >     >     > Jason
> >     >     >
> >     >     >
> >     >     >
> >     >     > On Fri, Jul 27, 2018 at 1:58 PM, James Cheng <
> > wushuja...@gmail.com>
> >     > wrote:
> >     >     >
> >     >     > > When you say that it will "break", what does this breakage
> > look
> >     > like?
> >     >     > Will
> >     >     > > the consumer-group be non-functional? Will just those
> > instances be
> >     >     > > non-functional? Or will the group be functional, but the
> >     > rebalancing be
> >     >     > > non-optimal and require more round-trips/data-transfer?
> > (similar
> >     > to the
> >     >     > > current algorithm)
> >     >     > >
> >     >     > > I'm trying to assess the potential for user-error and the
> > impact of
> >     >     > > user-error.
> >     >     > >
> >     >     > > -James
> >     >     > >
> >     >     > > > On Jul 27, 2018, at 11:25 AM, Boyang Chen <
> > bche...@outlook.com>
> >     > wrote:
> >     >     > > >
> >     >     > > > Hey James,
> >     >     > > >
> >     >     > > >
> >     >     > > > the algorithm is relying on client side to provide unique
> >     > consumer
> >     >     > > member id. It will break unless we enforce some sort of
> > validation
> >     > (host
> >     >     > +
> >     >     > > port) on the server side. To simplify the first version, we
> > do not
> >     > plan
> >     >     > to
> >     >     > > enforce validation. A good comparison would be the EOS
> > producer
> >     > which is
> >     >     > in
> >     >     > > charge of generating unique transaction id sequence. IMO
> for
> > broker
> >     >     > logic,
> >     >     > > the tolerance of client side error is not unlimited.
> >     >     > > >
> >     >     > > >
> >     >     > > > Thank you!
> >     >     > > >
> >     >     > > >
> >     >     > > > ________________________________
> >     >     > > > From: James Cheng <wushuja...@gmail.com>
> >     >     > > > Sent: Saturday, July 28, 2018 1:26 AM
> >     >     > > > To: dev@kafka.apache.org
> >     >     > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> >     > rebalances by
> >     >     > > specifying member id
> >     >     > > >
> >     >     > > >
> >     >     > > >> On Jul 26, 2018, at 11:09 PM, Guozhang Wang <
> > wangg...@gmail.com
> >     > >
> >     >     > wrote:
> >     >     > > >>
> >     >     > > >> Hi Boyang,
> >     >     > > >>
> >     >     > > >> Thanks for the proposed KIP. I made a pass over the wiki
> > and
> >     > here are
> >     >     > > some
> >     >     > > >> comments / questions:
> >     >     > > >>
> >     >     > > >> 1. In order to preserve broker compatibility, we need to
> > make
> >     > sure the
> >     >     > > >> broker version discovery logic can be integrated with
> > this new
> >     > logic.
> >     >     > > I.e.
> >     >     > > >> if a newer versioned consumer is talking to an older
> > versioned
> >     > broker
> >     >     > > who
> >     >     > > >> does not recognize V4, the client needs to downgrade its
> >     >     > > JoinGroupRequest
> >     >     > > >> version to V3 and not setting the member-id
> specifically.
> > You
> >     > can
> >     >     > take a
> >     >     > > >> look at the ApiVersionsRequest and see how to work with
> > it.
> >     >     > > >>
> >     >     > > >> 2. There may exist some manners to validate that two
> > different
> >     > clients
> >     >     > > do
> >     >     > > >> not send with the same member id, for example if we pass
> > along
> >     > the
> >     >     > > >> host:port information from KafkaApis to the
> > GroupCoordinator
> >     >     > interface.
> >     >     > > But
> >     >     > > >> I think this is overly complicated the logic and may not
> >     > worthwhile
> >     >     > than
> >     >     > > >> relying on users to specify unique member ids.
> >     >     > > >
> >     >     > > > Boyang,
> >     >     > > >
> >     >     > > > Thanks for the KIP! How will the algorithm behave if
> > multiple
> >     > consumers
> >     >     > > provide the same member id?
> >     >     > > >
> >     >     > > > -James
> >     >     > > >
> >     >     > > >> 3. Minor: you would need to bumping up the version of
> >     >     > JoinGroupResponse
> >     >     > > to
> >     >     > > >> V4 as well.
> >     >     > > >>
> >     >     > > >> 4. Minor: in the wiki page, you need to specify the
> actual
> >     > string
> >     >     > value
> >     >     > > for
> >     >     > > >> `MEMBER_ID`, for example "member.id".
> >     >     > > >>
> >     >     > > >> 5. When this additional config it specified by users, we
> > should
> >     >     > consider
> >     >     > > >> setting the default of internal
> > `LEAVE_GROUP_ON_CLOSE_CONFIG` to
> >     >     > false,
> >     >     > > >> since otherwise its effectiveness would be less.
> >     >     > > >>
> >     >     > > >>
> >     >     > > >> Guozhang
> >     >     > > >>
> >     >     > > >>
> >     >     > > >>
> >     >     > > >>> On Thu, Jul 26, 2018 at 9:20 PM, Boyang Chen <
> >     > bche...@outlook.com>
> >     >     > > wrote:
> >     >     > > >>>
> >     >     > > >>> Hey friends,
> >     >     > > >>>
> >     >     > > >>>
> >     >     > > >>> I would like to open a discussion thread on KIP-345:
> >     >     > > >>>
> >     >     > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 345%3A
> >     >     > > >>> +Reduce+multiple+consumer+rebalances+by+specifying+
> > member+id
> >     >     > > >>>
> >     >     > > >>>
> >     >     > > >>> This KIP is trying to resolve multiple rebalances by
> >     > maintaining the
> >     >     > > >>> consumer member id across rebalance generations. I have
> >     > verified the
> >     >     > > theory
> >     >     > > >>> on our internal Stream application, and it could reduce
> >     > rebalance
> >     >     > time
> >     >     > > to a
> >     >     > > >>> few seconds when service is rolling restart.
> >     >     > > >>>
> >     >     > > >>>
> >     >     > > >>> Let me know your thoughts, thank you!
> >     >     > > >>>
> >     >     > > >>>
> >     >     > > >>> Best,
> >     >     > > >>>
> >     >     > > >>> Boyang
> >     >     > > >>>
> >     >     > > >>
> >     >     > > >>
> >     >     > > >>
> >     >     > > >> --
> >     >     > > >> -- Guozhang
> >     >     > >
> >     >     > >
> >     >     >
> >     >
> >     >
> >     >
> >
> >
> >     --
> >     -- Guozhang
> >
> >
> >
>



-- 
-- Guozhang

Reply via email to