Yes, I think it makes sense to let KafkaStreams to expose embedded consumer
client-id and instance-ids, e.g. at the ThreadMetadata exposed via
`localThreadsMetadata`.

I can open up a new KIP for the change on Streams side so that we can keep
this KIP as on consumer side only.


Guozhang


On Fri, Nov 30, 2018 at 9:31 PM Boyang Chen <bche...@outlook.com> wrote:

> Thanks Guozhang and Mayuresh for the followup here.
> > Also I was thinking if we can have a replace API, that takes in a map of
> > old to new instance Ids. Such that we can replace a consumer.
> > IF we have this api, and if a consumer host goes down due to hardware
> > issues, we can have another host spin up and take its place. This is
> like a
> > cold backup which can be a step towards providing the hot backup that we
> > discussed earlier in the KIP.
> I like Mayuresh's suggestion, and I think we could prepare follow-up work
> once 345 is done to add a replace API. For the
> very first version I feel this is not a must-have.
>
> For Streams, I think we do not need an extra config for the instance id,
> instead, we can re-use the way we construct the embedded consumer's client
> id as:
>
> [streams client-id] + "-StreamThread-" + [thread-id] + "-consumer"
>
> So as long as user's specify the unique streams client-id, the resulted
> consumer client-id / instance-id should be unique as well already.
>
> So Guozhang you mean stream will enable static membership automatically
> correct? That would make the logic simpler
> and fewer code change on stream side.
>
> As for the LeaveGroupRequest, as I understand it, your concern is that when
> we are shutting down a single Streams instance that may contain multiple
> threads, shutting down that instance would mean shutting down multiple
> members. Personally I'd prefer to make the LeaveGroupRequest API more
> general and less inclined to Streams (I think Mayuresh also suggested
> this). So I'd suggest that we keep the LeaveGroupRequest API as suggested,
> i.e. a list of member.instance.ids. And in Streams we can add a new API in
> KafkaStreams to expose:
>
> 1) the list of embedded consumer / producer client ids,
> 2) the producer's txn ids if EOS is turned on, and
> 3) the consumer's instance ids.
>
> I agree with the suggestion to make the leave group request change
> generic. So this new Stream API
> will be added on the rest layer to expose the necessary ids correct?
>
> Looking forward to your confirmation 😊
>
> Best,
> Boyang
>
> ------------------------------
> *From:* Guozhang Wang <wangg...@gmail.com>
> *Sent:* Saturday, December 1, 2018 7:00 AM
> *To:* dev
> *Subject:* Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hi Boyang,
>
> For Streams, I think we do not need an extra config for the instance id,
> instead, we can re-use the way we construct the embedded consumer's client
> id as:
>
> [streams client-id] + "-StreamThread-" + [thread-id] + "-consumer"
>
> So as long as user's specify the unique streams client-id, the resulted
> consumer client-id / instance-id should be unique as well already.
>
> As for the LeaveGroupRequest, as I understand it, your concern is that when
> we are shutting down a single Streams instance that may contain multiple
> threads, shutting down that instance would mean shutting down multiple
> members. Personally I'd prefer to make the LeaveGroupRequest API more
> general and less inclined to Streams (I think Mayuresh also suggested
> this). So I'd suggest that we keep the LeaveGroupRequest API as suggested,
> i.e. a list of member.instance.ids. And in Streams we can add a new API in
> KafkaStreams to expose:
>
> 1) the list of embedded consumer / producer client ids,
> 2) the producer's txn ids if EOS is turned on, and
> 3) the consumer's instance ids.
>
> So that Streams operators can read those values from KafkaStreams directly
> before shutting it down and use the list in the LeaveGroupRequest API. How
> about that?
>
>
> Guozhang
>
>
> On Fri, Nov 30, 2018 at 7:45 AM Mayuresh Gharat <
> gharatmayures...@gmail.com>
> wrote:
>
> > I like Guozhang's suggestion to not have to wait for session timeout in
> > case we know that we want to downsize the consumer group and redistribute
> > the partitions among the remaining consumers.
> > IIUC, with the above suggestions, the admin api
> > "removeMemberFromGroup(groupId, list[instanceId])" or
> > "removeMemberFromGroup(groupId, instanceId)", will automatically cause a
> > rebalance, right?
> > I would prefer ist[instanceid] because that's more general scenario.
> >
> > Also I was thinking if we can have a replace API, that takes in a map of
> > old to new instance Ids. Such that we can replace a consumer.
> > IF we have this api, and if a consumer host goes down due to hardware
> > issues, we can have another host spin up and take its place. This is
> like a
> > cold backup which can be a step towards providing the hot backup that we
> > discussed earlier in the KIP.
> > Thoughts?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Thu, Nov 29, 2018 at 1:30 AM Boyang Chen <bche...@outlook.com> wrote:
> >
> > > In fact I feel that it's more convenient for user to specify a list of
> > > instance id prefixes. Because
> > > for general consumer application we couldn't always find a proper
> prefix
> > > to remove a list of consumers.
> > > So we are either adding list[instanceid prefix], or we could add two
> > > fields: instanceid prefix, and list[instanceid]
> > > for clarity purpose. As you know, two options are equivalent since full
> > > name is subset of prefix.
> > >
> > > Let me know your thoughts!
> > >
> > > Boyang
> > > ________________________________
> > > From: Boyang Chen <bche...@outlook.com>
> > > Sent: Thursday, November 29, 2018 3:39 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > > specifying member id
> > >
> > > Thanks Guozhang for the new proposal here!
> > >
> > > So I'd like to propose a slightly modified version of
> LeaveGroupRequest:
> > > instead of letting the static member consumer client themselves to send
> > the
> > > request (which means we still need to have some hidden configs to turn
> it
> > > off like we did today), how about just letting any other client to send
> > > this request since the LeaveGroupRequest only requires group.id and
> > > member.id? So back to your operational scenarios, if some static
> member
> > > has
> > > been found crashed and it is not likely to comeback, or we simply want
> to
> > > shrink the size of the group by shutting down some static members, we
> can
> > > use an admin client to send the LeaveGroupRequest after the instance
> has
> > > been completely shutdown or crashed to kick them out of the group and
> > also
> > > triggers the rebalance.
> > >
> > > One issue though, is that users may not know the member id required in
> > the
> > > LeaveGroupRequest. To work around it we can add the `group.instance.id
> `
> > > along with the member id as well and then allow member id null-able.
> The
> > > coordinator logic would then be modified as 1) if member.id is
> > specified,
> > > ignore instance.id and always use member.id to find the member to kick
> > > out,
> > > 2) otherwise, try with the instance.id to find the corresponding
> > member.id
> > > and kick it out, 3) if none is found, reject with an error code.
> > >
> > > So in sum the alternative changes are:
> > >
> > > a) Modify LeaveGroupRequest to add group.instance.id
> > > b) Modify coordinator logic to handle such request on the broker side.
> > > c) Add a new API in AdminClient like "removeMemberFromGroup(groupId,
> > > instanceId)" which will be translated as a LeaveGroupRequest.
> > > d) [Optional] we can even batch the request by allowing
> > > "removeMemberFromGroup(groupId, list[instanceId])" and then make `
> > > member.id`
> > > and `instance.id` field of LeaveGroupRequest to be an array instead
> of a
> > > single entry.
> > > e) We can also remove the admin ConsumerRebalanceRequest as well for
> > > simplicity (why not? paranoid of having as less request protocols as
> > > possible :), as it is not needed anymore with the above proposal.
> > > I agree that reusing LeaveGroupRequest is actually a good idea: we only
> > > need to iterate
> > > over an existing request format. Also I found that we haven't discussed
> > > how we want to enable
> > > this feature on Streaming applications, which is different from common
> > > consumer application in that
> > > Stream app uses stream thread as individual consumer.
> > > For example if user specifies the client id, the stream consumer client
> > id
> > > will be like:
> > > User client id + "-StreamThread-" + thread id + "-consumer"
> > >
> > > So I'm thinking we should do sth similar for defining
> group.instance.id
> > > on Stream. We shall define another
> > > config called `stream.instance.id` which would be used as prefix, and
> > for
> > > each thread consumer the formula
> > > will look like:
> > > `group.instance.id` = `stream.instance.id` + "-" + thread id +
> > "-consumer"
> > >
> > > And for the ease of use, the interface of leave group request could
> > > include `group.instance.id.prefix` instead of
> > > `group.instance.id` so that we could batch remove consumers relating
> to
> > a
> > > single stream instance. This is more intuitive
> > > and flexible since specifying names of 16~32 * n (n = number of stream
> > > instances to shut down) consumers is not an easy
> > > job without client management tooling.
> > >
> > > How does this workaround sound?
> > >
> > > Boyang
> > > ________________________________
> > > From: Guozhang Wang <wangg...@gmail.com>
> > > Sent: Thursday, November 29, 2018 2:38 AM
> > > To: dev
> > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > > specifying member id
> > >
> > > Hi Boyang,
> > >
> > > I was thinking that with the optional static members in the admin
> > > ConsumerRebalanceRequest it should be sufficient to kick out the static
> > > member before their session timeout (arguably long in practice) have
> not
> > > reached. But now I see your concern is that in some situations the
> admin
> > > operators may not even know the full list of static members, but ONLY
> > know
> > > which static member has failed and hence would like to kick out of the
> > > group.
> > >
> > > So I'd like to propose a slightly modified version of
> LeaveGroupRequest:
> > > instead of letting the static member consumer client themselves to send
> > the
> > > request (which means we still need to have some hidden configs to turn
> it
> > > off like we did today), how about just letting any other client to send
> > > this request since the LeaveGroupRequest only requires group.id and
> > > member.id? So back to your operational scenarios, if some static
> member
> > > has
> > > been found crashed and it is not likely to comeback, or we simply want
> to
> > > shrink the size of the group by shutting down some static members, we
> can
> > > use an admin client to send the LeaveGroupRequest after the instance
> has
> > > been completely shutdown or crashed to kick them out of the group and
> > also
> > > triggers the rebalance.
> > >
> > > One issue though, is that users may not know the member id required in
> > the
> > > LeaveGroupRequest. To work around it we can add the `group.instance.id
> `
> > > along with the member id as well and then allow member id null-able.
> The
> > > coordinator logic would then be modified as 1) if member.id is
> > specified,
> > > ignore instance.id and always use member.id to find the member to kick
> > > out,
> > > 2) otherwise, try with the instance.id to find the corresponding
> > member.id
> > > and kick it out, 3) if none is found, reject with an error code.
> > >
> > > So in sum the alternative changes are:
> > >
> > > a) Modify LeaveGroupRequest to add group.instance.id
> > > b) Modify coordinator logic to handle such request on the broker side.
> > > c) Add a new API in AdminClient like "removeMemberFromGroup(groupId,
> > > instanceId)" which will be translated as a LeaveGroupRequest.
> > > d) [Optional] we can even batch the request by allowing
> > > "removeMemberFromGroup(groupId, list[instanceId])" and then make `
> > > member.id`
> > > and `instance.id` field of LeaveGroupRequest to be an array instead
> of a
> > > single entry.
> > > e) We can also remove the admin ConsumerRebalanceRequest as well for
> > > simplicity (why not? paranoid of having as less request protocols as
> > > possible :), as it is not needed anymore with the above proposal.
> > >
> > >
> > > WDYT?
> > >
> > >
> > > Guozhang
> > >
> > > On Wed, Nov 28, 2018 at 5:34 AM Boyang Chen <bche...@outlook.com>
> wrote:
> > >
> > > > Thanks Guozhang and Mayuresh for the follow up! Answers are listed
> > below.
> > > >
> > > >
> > > > >  5. Regarding "So in summary, *the member will only be removed due
> to
> > > > > session timeout*. We shall remove it from both in-memory static
> > member
> > > > name
> > > > > mapping and member list." If the rebalance is invoked manually
> using
> > > the
> > > > > the admin apis, how long should the group coordinator wait for the
> > > > members
> > > > > of the group to send a JoinGroupRequest for participating in the
> > > > rebalance?
> > > > > How is a lagging consumer handled?
> > > >
> > > > Great question. Let's use c1~c4 example here:
> > > >
> > > >   1.  Consumer c1, c2, c3, c4 in stable state
> > > >   2.  c4 goes down and we detect this issue before session timeout
> > > through
> > > > client monitoring. Initiate a ConsumerRebalanceRequest.
> > > >   3.  A rebalance will be kicking off, and after rebalance timeout we
> > > > shall keep the same assignment for c1~4, if the session timeout for
> c4
> > > > hasn't reached
> > > >   4.  Group back to stable with c1~4 (although c4 is actually
> offline)
> > > >   5.  c4 session timeout finally reached: another rebalance
> triggered.
> > > >
> > > > For step 3, if session timeout triggered within rebalance timeout,
> only
> > > > c1~3 will be participating in the rebalance. This is what we mean by
> > > saying
> > > > "rebalance
> > > > timeout shall not remove current members, only session timeout will
> > do."
> > > > As you could see this is not an ideal scenario: we trigger extra
> > > rebalance
> > > > at step 5. In my reply to Guozhang I'm asking whether we should still
> > use
> > > > LeaveGroupRequest for static members to send a signal to broker
> saying
> > > "I'm
> > > > currently offline", and when we send ConsumerRebalanceRequest to
> > broker,
> > > we
> > > > will actually kick off c4 because it says it's offline already,
> saving
> > > one
> > > > or multiple additional rebalances later. This way the
> > > > ConsumerRebalanceRequest will be more effective in making correct
> > > judgement
> > > > on the group status since we have more feedback from client side.
> > > >
> > > > > - When we say that we would use invokeConsumerRebalance(groupId) to
> > > down
> > > > > scale, with the example in the above question, how will the
> > > > > GroupCoordinator know that c4 should be kicked out of the group
> since
> > > we
> > > > > are trying to invoke rebalance proactively without waiting for c4's
> > > > session
> > > > > time out to expire. Should there be a way of telling the
> > > GroupCoordinator
> > > > > that consumer c4 has been kicked out of the groupId = "GroupA"?
> > > > Previous proposal should be suffice to answer this question 😊
> > > >
> > > > - Also it looks like the statement "If the `member.id` uses
> > > > > UNKNOWN_MEMBER_NAME, we shall always generate a new member id and
> > > replace
> > > > > the one within current map, if `group.member.name` is known. Also
> > once
> > > > we
> > > > > are done with KIP-394
> > > > > <
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=AWHFruanfD%2BR2S4thCniTxKUBTo9fziEzEDeefxskrs%3D&amp;reserved=0
> > > > > >,
> > > > > all the join group requests are requiring `member.id` to
> physically
> > > > enter
> > > > > the consumer group. This way the latest joined " is incomplete. Can
> > you
> > > > > take a look at this?
> > > > > Also when we say "all the join group requests are requiring `
> > member.id
> > > `
> > > > to
> > > > > physically enter the consumer group." because a newly started
> > consumer
> > > > will
> > > > > not have a "member.id", I assume you mean, once the
> GroupCoordinator
> > > > > assigns a member.id to the newly started consumer, it has to use
> it
> > > for
> > > > > any
> > > > > future JoinGroupRequests. Is my understanding correct?
> > > > >
> > > > Thanks for catching it! And yes, we shall use one extra round-trip
> > > between
> > > > consumer
> > > > and broker to inform the new member id allocation.
> > > >
> > > > Next is the replies to Guozhang's comment:
> > > > 2) I once have a discussion about the LeaveGroupRequest for static
> > > members,
> > > > and the reason for not having it for static members is that we'd need
> > to
> > > > make it a configurable behavior as well (i.e. the likelihood that a
> > > static
> > > > member may shutdown but come back later may be even larger than the
> > > > likelihood that a shutdown static member would not come back), and
> > when a
> > > > shutdown is complete the instance cannot tell whether or not it will
> > come
> > > > back by itself. And hence letting a third party (think: admin used by
> > K8s
> > > > plugins) issuing a request to indicate static member changes would be
> > > more
> > > > plausible.
> > > >
> > > > I think having an optional list of all the static members that are
> > still
> > > in
> > > > the group, rather than the members to be removed since the latter
> > looks a
> > > > bit less flexible to me, in the request is a good idea (remember we
> > > allow a
> > > > group to have both static and dynamic members at the same time, so
> when
> > > > receiving the request, we will only do the diff and add / remove the
> > > static
> > > > members directly only, while still let the dynamic members to try to
> > > > re-join the group with the rebalance timeout).
> > > > I'm also in favor of storing all the in-group static members. In fact
> > we
> > > > could reuse
> > > > the static membership mapping to store this information. Do you think
> > > > that we should let static member send leave group request to indicate
> > > > their status of "leaving",
> > > > and use ConsumerRebalanceRequest to trigger rebalance without them?
> I'm
> > > > suggesting we should
> > > > remove those members when kicking off rebalance since we are shutting
> > > them
> > > > down already.
> > > >
> > > > 3) personally I favor "ids" over "names" :) Since we already have
> some
> > > > "ids" and hence it sounds more consistent, plus on the producer side
> we
> > > > have a `transactional.id` whose semantics is a bit similar to this
> > one,
> > > > i.e. for unique distinguishment of a client which may comes and goes
> > but
> > > > need to be persist over multiple "instance life-times".
> > > > Sure we have enough votes for ids 😊I will finalize the name to `
> > > > group.instance.id`, does that
> > > > sound good?
> > > >
> > > > Best,
> > > > Boyang
> > > > ________________________________
> > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > Sent: Wednesday, November 28, 2018 4:51 AM
> > > > To: dev
> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by
> > > > specifying member id
> > > >
> > > > Regarding Jason's question and Boyang's responses:
> > > >
> > > > 2) I once have a discussion about the LeaveGroupRequest for static
> > > members,
> > > > and the reason for not having it for static members is that we'd need
> > to
> > > > make it a configurable behavior as well (i.e. the likelihood that a
> > > static
> > > > member may shutdown but come back later may be even larger than the
> > > > likelihood that a shutdown static member would not come back), and
> > when a
> > > > shutdown is complete the instance cannot tell whether or not it will
> > come
> > > > back by itself. And hence letting a third party (think: admin used by
> > K8s
> > > > plugins) issuing a request to indicate static member changes would be
> > > more
> > > > plausible.
> > > >
> > > > I think having an optional list of all the static members that are
> > still
> > > in
> > > > the group, rather than the members to be removed since the latter
> > looks a
> > > > bit less flexible to me, in the request is a good idea (remember we
> > > allow a
> > > > group to have both static and dynamic members at the same time, so
> when
> > > > receiving the request, we will only do the diff and add / remove the
> > > static
> > > > members directly only, while still let the dynamic members to try to
> > > > re-join the group with the rebalance timeout).
> > > >
> > > > 3) personally I favor "ids" over "names" :) Since we already have
> some
> > > > "ids" and hence it sounds more consistent, plus on the producer side
> we
> > > > have a `transactional.id` whose semantics is a bit similar to this
> > one,
> > > > i.e. for unique distinguishment of a client which may comes and goes
> > but
> > > > need to be persist over multiple "instance life-times".
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Tue, Nov 27, 2018 at 10:00 AM Mayuresh Gharat <
> > > > gharatmayures...@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Boyang,
> > > > >
> > > > > Thanks for the replies. Please find the follow up queries below.
> > > > >
> > > > >     5. Regarding "So in summary, *the member will only be removed
> due
> > > to
> > > > > session timeout*. We shall remove it from both in-memory static
> > member
> > > > name
> > > > > mapping and member list." If the rebalance is invoked manually
> using
> > > the
> > > > > the admin apis, how long should the group coordinator wait for the
> > > > members
> > > > > of the group to send a JoinGroupRequest for participating in the
> > > > rebalance?
> > > > > How is a lagging consumer handled?
> > > > > The plan is to disable member kick out when rebalance.timeout is
> > > reached,
> > > > > so basically we are not "waiting" any
> > > > > join group request from existing members; we shall just rebalance
> > base
> > > on
> > > > > what we currently have within the group
> > > > > metadata. Lagging consumer will trigger rebalance later if session
> > > > timeout
> > > > > > rebalance timeout.
> > > > >
> > > > > >
> > > > > Just wanted to understand this better. Lets take an example, say we
> > > have
> > > > a
> > > > > > consumer group "GroupA" with 4 consumers  c1, c2, c3, c4.
> > > > > > Everything is running fine and suddenly C4 host has issues and it
> > > goes
> > > > > > down. Now we notice that we can still operate with c1, c2, c3 and
> > > don't
> > > > > > want to wait for
> > > > > > c4 to come back up. We use the admin api
> > > > > > "invokeConsumerRebalance("GroupA")".
> > > > > > Now the GroupCoordinator, will ask the members c1, c2, c3 to join
> > the
> > > > > > group again (in there heartBeatResponse) as first step of
> > rebalance.
> > > > > > Now lets say that c1, c2 immediately send a joinGroupRequest but
> c3
> > > is
> > > > > > delayed. At this stage, if we are not "waiting" on any join group
> > > > > request,
> > > > > > few things can happen :
> > > > > >
> > > > > >    - c4's partitions are distributed only among c1,c2. c3
> maintains
> > > its
> > > > > >    original assignment. c1, c2 will start processing the newly
> > > assigned
> > > > > >    partitions.
> > > > > >
> > > > > > OR
> > > > > >
> > > > > >    - c4's partitions are distributed among c1, c2, c3. c1 and c2
> > > start
> > > > > >    processing the newly assigned partitions. c3 gets to know
> about
> > > the
> > > > > newly
> > > > > >    assigned partitions later when it sends the JoinGroupRequest
> > > (which
> > > > > was
> > > > > >    delayed).
> > > > > >
> > > > > > OR
> > > > > >
> > > > > >    - Will the rebalance do a complete reassignment, where c1, c2,
> > c3
> > > > have
> > > > > >    to give up there partitions and all the partitions belonging
> to
> > > c1,
> > > > > c2, c3,
> > > > > >    c4 will be redistributed among c1, c2, c3 ? If this is the
> case,
> > > the
> > > > > >    GroupCoordinator needs to give some buffer time for c1, c2, c3
> > to
> > > > > revoke
> > > > > >    there partitions and rejoin the group.
> > > > > >
> > > > > > This is as per my understanding of how the KIP would work without
> > > > > changing
> > > > > > the underlying group coordination workflow. Please correct me if
> I
> > > > > > misunderstood something here.
> > > > > >
> > > > >
> > > > >
> > > > > - When we say that we would use invokeConsumerRebalance(groupId) to
> > > down
> > > > > scale, with the example in the above question, how will the
> > > > > GroupCoordinator know that c4 should be kicked out of the group
> since
> > > we
> > > > > are trying to invoke rebalance proactively without waiting for c4's
> > > > session
> > > > > time out to expire. Should there be a way of telling the
> > > GroupCoordinator
> > > > > that consumer c4 has been kicked out of the groupId = "GroupA"?
> > > > >
> > > > > - Also it looks like the statement "If the `member.id` uses
> > > > > UNKNOWN_MEMBER_NAME, we shall always generate a new member id and
> > > replace
> > > > > the one within current map, if `group.member.name` is known. Also
> > once
> > > > we
> > > > > are done with KIP-394
> > > > > <
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=AWHFruanfD%2BR2S4thCniTxKUBTo9fziEzEDeefxskrs%3D&amp;reserved=0
> > > > > >,
> > > > > all the join group requests are requiring `member.id` to
> physically
> > > > enter
> > > > > the consumer group. This way the latest joined " is incomplete. Can
> > you
> > > > > take a look at this?
> > > > > Also when we say "all the join group requests are requiring `
> > member.id
> > > `
> > > > to
> > > > > physically enter the consumer group." because a newly started
> > consumer
> > > > will
> > > > > not have a "member.id", I assume you mean, once the
> GroupCoordinator
> > > > > assigns a member.id to the newly started consumer, it has to use
> it
> > > for
> > > > > any
> > > > > future JoinGroupRequests. Is my understanding correct?
> > > > >
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Mayuresh
> > > > >
> > > > > On Mon, Nov 26, 2018 at 9:20 PM Boyang Chen <bche...@outlook.com>
> > > wrote:
> > > > >
> > > > > > Thanks Mayuresh and Jason for your follow-ups! Let me try to
> answer
> > > > both
> > > > > > in this reply.
> > > > > >
> > > > > >
> > > > > > >    1. Do you intend to have member.id is a static config like
> > > > > > member.name
> > > > > > >    after KIP-345 and KIP-394?
> > > > > >
> > > > > > No, we shall only rely on broker to allocate member.id for the
> > > > consumer
> > > > > > instances. FYI, I already
> > > > > >
> > > > > > started the discussion thread for KIP-394 😊
> > > > > >
> > > > > > >    2. Regarding "On client side, we add a new config called
> > > > MEMBER_NAME
> > > > > > in
> > > > > > >    ConsumerConfig. On consumer service init, if the MEMBER_NAME
> > > > config
> > > > > is
> > > > > > > set,
> > > > > > >    we will put it in the initial join group request to identify
> > > > itself
> > > > > > as a
> > > > > > >    static member (static membership); otherwise, we will still
> > send
> > > > > > >    UNKNOWN_MEMBER_ID to ask broker for allocating a new random
> ID
> > > > > > (dynamic
> > > > > > >    membership)."
> > > > > > >       - What is the value of member_id sent in the first
> > > > > JoinGroupRequest
> > > > > > >       when member_name is set (using static rebalance)? Is it
> > > > > > > UNKNOW_MEMBER_ID?
> > > > > >
> > > > > > Yes, we could only use unknown member id. Actually this part of
> the
> > > > > > proposal is outdated,
> > > > > >
> > > > > > let me do another audit of the whole doc. Basically, it is
> > currently
> > > > > > impossible to send `member.id`
> > > > > >
> > > > > > when consumer restarted. Sorry for the confusions!
> > > > > >
> > > > > > >    3. Regarding "we are requiring member.id (if not unknown)
> to
> > > > match
> > > > > > the
> > > > > > >    value stored in cache, otherwise reply MEMBER_ID_MISMATCH.
> The
> > > > edge
> > > > > > case
> > > > > > >    that if we could have members with the same `member.name`
> > (for
> > > > > > example
> > > > > > >    mis-configured instances with a valid member.id but added a
> > > used
> > > > > > member
> > > > > > >    name on runtime). When member name has duplicates, we could
> > > refuse
> > > > > > join
> > > > > > >    request from members with an outdated `member.id` (since we
> > > > update
> > > > > > the
> > > > > > >    mapping upon each join group request). In an edge case where
> > the
> > > > > > client
> > > > > > >    hits this exception in the response, it is suggesting that
> > some
> > > > > other
> > > > > > >    consumer takes its spot."
> > > > > > >       - The part of "some other consumer takes the spot" would
> be
> > > > > > >       intentional, right? Also when you say " The edge case
> that
> > if
> > > > we
> > > > > > >       could have members with the same `member.name` (for
> > example
> > > > > > >       mis-configured instances *with a valid member.id <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&amp;reserved=0
> > > > > > >
> > > > > > > *but
> > > > > > >       added a used member name on runtime).", what do you mean
> by
> > > > > *valid
> > > > > > >       member id* here? Does it mean that there exist a mapping
> of
> > > > > > >       member.name to member.id like *MemberA -> id1* on the
> > > > > > >       GroupCoordinator and this consumer is trying to join
> with *
> > > > > > > member.name
> > > > > > >       <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1zOe9ZqIHB7lvt8XJt2jtLvRaP75G3OamvtLRLjysyo%3D&amp;reserved=0
> > > > > >
> > > > > > = MemberB and member.id <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&amp;reserved=0
> > > > > >
> > > > > > =
> > > > > > > id1 *
> > > > > > >       ?
> > > > > >
> > > > > > I would take Jason's advice that each time we have unknown member
> > > > joining
> > > > > > the group, the broker will
> > > > > >
> > > > > > always assign a new and unique id to track its identity. In this
> > way,
> > > > > > consumer with duplicate member name
> > > > > >
> > > > > > will be fenced.
> > > > > >
> > > > > > >    4. Depending on your explanation for point 2 and the point 3
> > > above
> > > > > > >    regarding returning back MEMBER_ID_MISMATCH on having a
> > matching
> > > > > > >    member_name but unknown member_id, if the consumer sends
> > > > > > > "UNKNOW_MEMBER_ID"
> > > > > > >    on the first JoinGroupRequest and relies on the
> > GroupCoordinator
> > > > to
> > > > > > > give it
> > > > > > >    a member_id, is the consumer suppose to remember member_id
> for
> > > > > > >    joinGroupRequests? If yes, how are restarts handled?
> > > > > >
> > > > > > Like explained above, we shall not materialize the member.id.
> > > Instead
> > > > we
> > > > > > need to rely on broker to allocate
> > > > > >
> > > > > > a unique id for consumer just like what we have now.
> > > > > >
> > > > > > >    5. Regarding "So in summary, *the member will only be
> removed
> > > due
> > > > to
> > > > > > >    session timeout*. We shall remove it from both in-memory
> > static
> > > > > member
> > > > > > >    name mapping and member list."
> > > > > > >       - If the rebalance is invoked manually using the the
> admin
> > > > apis,
> > > > > > how
> > > > > > >       long should the group coordinator wait for the members of
> > the
> > > > > > > group to send
> > > > > > >       a JoinGroupRequest for participating in the rebalance?
> How
> > > is a
> > > > > > > lagging
> > > > > > >       consumer handled?
> > > > > >
> > > > > > The plan is to disable member kick out when rebalance.timeout is
> > > > reached,
> > > > > > so basically we are not "waiting" any
> > > > > >
> > > > > > join group request from existing members; we shall just rebalance
> > > base
> > > > on
> > > > > > what we currently have within the group
> > > > > >
> > > > > > metadata. Lagging consumer will trigger rebalance later if
> session
> > > > > timeout
> > > > > > > rebalance timeout.
> > > > > >
> > > > > > >    6. Another detail to take care is that we need to
> > automatically
> > > > take
> > > > > > the
> > > > > > >    hash of group id so that we know which broker to send this
> > > request
> > > > > to.
> > > > > > >       - I assume this should be same as the way we find the
> > > > > coordinator,
> > > > > > >       today right? If yes, should we specify it in the KIP ?
> > > > > >
> > > > > > Yep, it is. Add FindCoordinatorRequest logic to the script.
> > > > > >
> > > > > > >    7. Are there any specific failure scenarios when you say
> > "other
> > > > > > >    potential failure cases."? It would be good to mention them
> > > > > > explicitly,
> > > > > > > if
> > > > > > >    you think there are any.
> > > > > >
> > > > > > Nah, I'm gonna remove it because it seems causing more confusion
> > than
> > > > > > making my assumption clear, which is
> > > > > >
> > > > > > "there could be other failure cases that I can't enumerate now"
> 😊
> > > > > >
> > > > > > >    8. It would be good to have a rollback plan as you have for
> > roll
> > > > > > forward
> > > > > > >    in the KIP.
> > > > > >
> > > > > > Great suggestion! Added a simple rollback plan.
> > > > > >
> > > > > >
> > > > > > Next is answering Jason's suggestions:
> > > > > >
> > > > > > 1. This may be the same thing that Mayuresh is asking about. I
> > think
> > > > the
> > > > > > suggestion in the KIP is that if a consumer sends JoinGroup with
> a
> > > > member
> > > > > > name, but no member id, then we will return the current member id
> > > > > > associated with that name. It seems in this case that we wouldn't
> > be
> > > > able
> > > > > > to protect from having two consumers active with the same
> > configured
> > > > > > member.name? For example, imagine that we had a consumer with
> > > > > member.name
> > > > > > =A
> > > > > > which is assigned member.id=1. Suppose it becomes a zombie and a
> > new
> > > > > > instance starts up with member.name=A. If it is also assigned
> > > > member.id
> > > > > =1,
> > > > > > then how can we detect the zombie if it comes back to life? Both
> > > > > instances
> > > > > > will have the same member.id.
> > > > > >
> > > > > > The goal is to avoid a rebalance on a rolling restart, but we
> still
> > > > need
> > > > > to
> > > > > > fence previous members. I am wondering if we can generate a new
> > > > > member.id
> > > > > > every time we receive a request from a static member with an
> > unknown
> > > > > member
> > > > > > id. If the old instance with the same member.name attempts any
> > > > > operation,
> > > > > > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long
> as
> > > the
> > > > > > subscription of the new instance hasn't changed, then we can skip
> > the
> > > > > > rebalance and return the current assignment without forcing a
> > > > rebalance.
> > > > > >
> > > > > > The trick to making this work is in the error handling of the
> > zombie
> > > > > > consumer. If the zombie simply resets its member.id and rejoins
> to
> > > > get a
> > > > > > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would
> > end
> > > > up
> > > > > > fencing the new member. We want to avoid this. There needs to be
> an
> > > > > > expectation for static members that the member.id of a static
> > member
> > > > > will
> > > > > > not be changed except when a new member with the same
> member.name
> > > > joins
> > > > > > the
> > > > > > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for
> > > > consumers
> > > > > > with static member names.
> > > > > >
> > > > > > Yep, I like this idea! Keep giving out refresh member.id when
> > facing
> > > > > > anonymous request will definitely
> > > > > >
> > > > > > prevent processing bug due to duplicate consumers, however I
> don't
> > > > think
> > > > > I
> > > > > > fully understand the 3rd paragraph where
> > > > > >
> > > > > > you mentioned  "There needs to be an expectation for static
> members
> > > > that
> > > > > > the member.id of a static member will
> > > > > >
> > > > > > not be changed except when a new member with the same
> member.name
> > > > joins
> > > > > > the group. "  How do you plan
> > > > > > to know whether this member is new member or old member? I feel
> > even
> > > > with
> > > > > > zombie consumer takes the ownership,
> > > > > > it should be detected very quickly (as MISMATCH_ID exception
> > trigger
> > > > > > original consumer instance dies)
> > > > > > and end user will start to fix it right away. Is there any
> similar
> > > > logic
> > > > > > we applied in fencing duplicate `transaction.id`?
> > > > > >
> > > > > > 2. The mechanics of the ConsumerRebalance API seem unclear to me.
> > As
> > > > far
> > > > > as
> > > > > > I understand it, it is used for scaling down a consumer group and
> > > > somehow
> > > > > > bypasses normal session timeout expiration. I am wondering how
> > > critical
> > > > > > this piece is and whether we can leave it for future work. If
> not,
> > > then
> > > > > it
> > > > > > would be helpful to elaborate on its implementation. How would
> the
> > > > > > coordinator know which members to kick out of the group?
> > > > > >
> > > > > > This API is needed when we need to immediately trigger rebalance
> > > > instead
> > > > > > of waiting session timeout
> > > > > >
> > > > > > or rebalance timeout (Emergent scale up/down). It is very
> necessary
> > > to
> > > > > > have it for
> > > > > >
> > > > > > management purpose because user could choose when to trigger
> > > rebalance
> > > > > > pretty freely,
> > > > > >
> > > > > > gaining more client side control.
> > > > > >
> > > > > > In the meanwhile I see your point that we need to actually have
> the
> > > > > > ability to kick out members that we plan
> > > > > >
> > > > > > to scale down fast (as rebalance timeout no longer kicks any
> > offline
> > > > > > member out of the group), I will think of adding an optional
> > > > > >
> > > > > > list of members that are ready to be removed.
> > > > > >
> > > > > > Another idea is to let static member send `LeaveGroupRequest`
> when
> > > they
> > > > > > are going offline (either scale down or bouncing),
> > > > > >
> > > > > > and broker will cache this information as "OfflineMembers"
> without
> > > > > > triggering rebalance. When handling ConsumerRebalanceRequest
> broker
> > > > will
> > > > > >
> > > > > > kick the static members that are currently offline and trigger
> > > > rebalance
> > > > > > immediately. How does this plan sound?
> > > > > >
> > > > > > 3. I've been holding back on mentioning this, but I think we
> should
> > > > > > reconsider the name `member.name`. I think we want something
> that
> > > > > suggests
> > > > > > its expectation of uniqueness in the group. How about `
> > > > group.instance.id
> > > > > `
> > > > > > to go along with `group.id`?
> > > > > >
> > > > > > Yea, Dong and Stanislav also mentioned this naming. I personally
> > buy
> > > in
> > > > > > the namespace idea, and
> > > > > >
> > > > > > since we already use `member.name` in a lot of context, I decide
> > to
> > > > > > rename the config to `group.member.name`
> > > > > >
> > > > > > which should be sufficient for solving all the concerns we have
> > now.
> > > > > > Sounds good?
> > > > > >
> > > > > >
> > > > > > Thank you for your great suggestions! Let me know if my reply
> makes
> > > > sense
> > > > > > her.
> > > > > >
> > > > > >
> > > > > > Best,
> > > > > >
> > > > > > Boyang
> > > > > >
> > > > > > ________________________________
> > > > > > From: Jason Gustafson <ja...@confluent.io>
> > > > > > Sent: Tuesday, November 27, 2018 7:51 AM
> > > > > > To: dev
> > > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> rebalances
> > > by
> > > > > > specifying member id
> > > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > Thanks for the updates. Looks like we're headed in the right
> > > direction
> > > > > and
> > > > > > clearly the interest that this KIP is receiving shows how strong
> > the
> > > > > > motivation is!
> > > > > >
> > > > > > I have a few questions:
> > > > > >
> > > > > > 1. This may be the same thing that Mayuresh is asking about. I
> > think
> > > > the
> > > > > > suggestion in the KIP is that if a consumer sends JoinGroup with
> a
> > > > member
> > > > > > name, but no member id, then we will return the current member id
> > > > > > associated with that name. It seems in this case that we wouldn't
> > be
> > > > able
> > > > > > to protect from having two consumers active with the same
> > configured
> > > > > > member.name? For example, imagine that we had a consumer with
> > > > > member.name
> > > > > > =A
> > > > > > which is assigned member.id=1. Suppose it becomes a zombie and a
> > new
> > > > > > instance starts up with member.name=A. If it is also assigned
> > > > member.id
> > > > > =1,
> > > > > > then how can we detect the zombie if it comes back to life? Both
> > > > > instances
> > > > > > will have the same member.id.
> > > > > >
> > > > > > The goal is to avoid a rebalance on a rolling restart, but we
> still
> > > > need
> > > > > to
> > > > > > fence previous members. I am wondering if we can generate a new
> > > > > member.id
> > > > > > every time we receive a request from a static member with an
> > unknown
> > > > > member
> > > > > > id. If the old instance with the same member.name attempts any
> > > > > operation,
> > > > > > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long
> as
> > > the
> > > > > > subscription of the new instance hasn't changed, then we can skip
> > the
> > > > > > rebalance and return the current assignment without forcing a
> > > > rebalance.
> > > > > >
> > > > > > The trick to making this work is in the error handling of the
> > zombie
> > > > > > consumer. If the zombie simply resets its member.id and rejoins
> to
> > > > get a
> > > > > > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would
> > end
> > > > up
> > > > > > fencing the new member. We want to avoid this. There needs to be
> an
> > > > > > expectation for static members that the member.id of a static
> > member
> > > > > will
> > > > > > not be changed except when a new member with the same
> member.name
> > > > joins
> > > > > > the
> > > > > > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for
> > > > consumers
> > > > > > with static member names.
> > > > > >
> > > > > > 2. The mechanics of the ConsumerRebalance API seem unclear to me.
> > As
> > > > far
> > > > > as
> > > > > > I understand it, it is used for scaling down a consumer group and
> > > > somehow
> > > > > > bypasses normal session timeout expiration. I am wondering how
> > > critical
> > > > > > this piece is and whether we can leave it for future work. If
> not,
> > > then
> > > > > it
> > > > > > would be helpful to elaborate on its implementation. How would
> the
> > > > > > coordinator know which members to kick out of the group?
> > > > > >
> > > > > > 3. I've been holding back on mentioning this, but I think we
> should
> > > > > > reconsider the name `member.name`. I think we want something
> that
> > > > > suggests
> > > > > > its expectation of uniqueness in the group. How about `
> > > > group.instance.id
> > > > > `
> > > > > > to go along with `group.id`?
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 26, 2018 at 10:18 AM Mayuresh Gharat <
> > > > > > gharatmayures...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Boyang,
> > > > > > >
> > > > > > > Thanks a lot for replying to all the queries and discussions
> > here,
> > > so
> > > > > > > patiently.
> > > > > > > Really appreciate it.
> > > > > > >
> > > > > > > Had a few questions and suggestions after rereading the current
> > > > version
> > > > > > of
> > > > > > > the KIP :
> > > > > > >
> > > > > > >
> > > > > > >    1. Do you intend to have member.id is a static config like
> > > > > > member.name
> > > > > > >    after KIP-345 and KIP-394?
> > > > > > >    2. Regarding "On client side, we add a new config called
> > > > MEMBER_NAME
> > > > > > in
> > > > > > >    ConsumerConfig. On consumer service init, if the MEMBER_NAME
> > > > config
> > > > > is
> > > > > > > set,
> > > > > > >    we will put it in the initial join group request to identify
> > > > itself
> > > > > > as a
> > > > > > >    static member (static membership); otherwise, we will still
> > send
> > > > > > >    UNKNOWN_MEMBER_ID to ask broker for allocating a new random
> ID
> > > > > > (dynamic
> > > > > > >    membership)."
> > > > > > >       - What is the value of member_id sent in the first
> > > > > JoinGroupRequest
> > > > > > >       when member_name is set (using static rebalance)? Is it
> > > > > > > UNKNOW_MEMBER_ID?
> > > > > > >    3. Regarding "we are requiring member.id (if not unknown)
> to
> > > > match
> > > > > > the
> > > > > > >    value stored in cache, otherwise reply MEMBER_ID_MISMATCH.
> The
> > > > edge
> > > > > > case
> > > > > > >    that if we could have members with the same `member.name`
> > (for
> > > > > > example
> > > > > > >    mis-configured instances with a valid member.id but added a
> > > used
> > > > > > member
> > > > > > >    name on runtime). When member name has duplicates, we could
> > > refuse
> > > > > > join
> > > > > > >    request from members with an outdated `member.id` (since we
> > > > update
> > > > > > the
> > > > > > >    mapping upon each join group request). In an edge case where
> > the
> > > > > > client
> > > > > > >    hits this exception in the response, it is suggesting that
> > some
> > > > > other
> > > > > > >    consumer takes its spot."
> > > > > > >       - The part of "some other consumer takes the spot" would
> be
> > > > > > >       intentional, right? Also when you say " The edge case
> that
> > if
> > > > we
> > > > > > >       could have members with the same `member.name` (for
> > example
> > > > > > >       mis-configured instances *with a valid member.id <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&amp;reserved=0
> > > > > > >
> > > > > > > *but
> > > > > > >       added a used member name on runtime).", what do you mean
> by
> > > > > *valid
> > > > > > >       member id* here? Does it mean that there exist a mapping
> of
> > > > > > >       member.name to member.id like *MemberA -> id1* on the
> > > > > > >       GroupCoordinator and this consumer is trying to join
> with *
> > > > > > > member.name
> > > > > > >       <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1zOe9ZqIHB7lvt8XJt2jtLvRaP75G3OamvtLRLjysyo%3D&amp;reserved=0
> > > > > >
> > > > > > = MemberB and member.id <
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&amp;reserved=0
> > > > > >
> > > > > > =
> > > > > > > id1 *
> > > > > > >       ?
> > > > > > >    4. Depending on your explanation for point 2 and the point 3
> > > above
> > > > > > >    regarding returning back MEMBER_ID_MISMATCH on having a
> > matching
> > > > > > >    member_name but unknown member_id, if the consumer sends
> > > > > > > "UNKNOW_MEMBER_ID"
> > > > > > >    on the first JoinGroupRequest and relies on the
> > GroupCoordinator
> > > > to
> > > > > > > give it
> > > > > > >    a member_id, is the consumer suppose to remember member_id
> for
> > > > > > >    joinGroupRequests? If yes, how are restarts handled?
> > > > > > >    5. Regarding "So in summary, *the member will only be
> removed
> > > due
> > > > to
> > > > > > >    session timeout*. We shall remove it from both in-memory
> > static
> > > > > member
> > > > > > >    name mapping and member list."
> > > > > > >       - If the rebalance is invoked manually using the the
> admin
> > > > apis,
> > > > > > how
> > > > > > >       long should the group coordinator wait for the members of
> > the
> > > > > > > group to send
> > > > > > >       a JoinGroupRequest for participating in the rebalance?
> How
> > > is a
> > > > > > > lagging
> > > > > > >       consumer handled?
> > > > > > >    6. Another detail to take care is that we need to
> > automatically
> > > > take
> > > > > > the
> > > > > > >    hash of group id so that we know which broker to send this
> > > request
> > > > > to.
> > > > > > >       - I assume this should be same as the way we find the
> > > > > coordinator,
> > > > > > >       today right? If yes, should we specify it in the KIP ?
> > > > > > >    7. Are there any specific failure scenarios when you say
> > "other
> > > > > > >    potential failure cases."? It would be good to mention them
> > > > > > explicitly,
> > > > > > > if
> > > > > > >    you think there are any.
> > > > > > >    8. It would be good to have a rollback plan as you have for
> > roll
> > > > > > forward
> > > > > > >    in the KIP.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Mayuresh
> > > > > > >
> > > > > > > On Mon, Nov 26, 2018 at 8:17 AM Mayuresh Gharat <
> > > > > > > gharatmayures...@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Boyang,
> > > > > > > >
> > > > > > > > Do you have a discuss thread for KIP-394 that you mentioned
> > here
> > > ?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Mayuresh
> > > > > > > >
> > > > > > > > On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen <
> > bche...@outlook.com
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hey Dong, thanks for the follow-up here!
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> 1) It is not very clear to the user what is the difference
> > > between
> > > > > > > >> member.name and client.id as both seems to be used to
> > identify
> > > > the
> > > > > > > >> consumer. I am wondering if it would be more intuitive to
> name
> > > it
> > > > > > > >> group.member.name (preferred choice since it matches the
> > > current
> > > > > > > group.id
> > > > > > > >> config name) or rebalance.member.name to explicitly show
> that
> > > the
> > > > > id
> > > > > > is
> > > > > > > >> solely used for rebalance.
> > > > > > > >> Great question. I feel `member.name` is enough to explain
> > > itself,
> > > > > it
> > > > > > > >> seems not very
> > > > > > > >> helpful to make the config name longer. Comparing `name`
> with
> > > `id`
> > > > > > gives
> > > > > > > >> user the
> > > > > > > >> impression that they have the control over it with
> customized
> > > rule
> > > > > > than
> > > > > > > >> library decided.
> > > > > > > >>
> > > > > > > >> 2) In the interface change section it is said that
> > > > > > > >> GroupMaxSessionTimeoutMs
> > > > > > > >> will be changed to 30 minutes. It seems to suggest that we
> > will
> > > > > change
> > > > > > > the
> > > > > > > >> default value of this config. It does not seem necessary to
> > > > increase
> > > > > > the
> > > > > > > >> time of consumer failure detection when user doesn't use
> > static
> > > > > > > >> membership.
> > > > > > > >> Also, say static membership is enabled, then this default
> > config
> > > > > > change
> > > > > > > >> will cause a partition to be unavailable for consumption for
> > 30
> > > > > > minutes
> > > > > > > if
> > > > > > > >> there is hard consumer failure, which seems to be worse
> > > experience
> > > > > > than
> > > > > > > >> having unnecessary rebalance (when this timeout is small),
> > > > > > particularly
> > > > > > > >> for
> > > > > > > >> new users of Kafka. Could you explain more why we should
> make
> > > this
> > > > > > > change?
> > > > > > > >> We are not changing the default session timeout value. We
> are
> > > just
> > > > > > > >> changing the
> > > > > > > >> cap we are enforcing on the session timeout max value. So
> this
> > > > > change
> > > > > > is
> > > > > > > >> not affecting
> > > > > > > >> what kind of membership end user is using, and loosing the
> cap
> > > is
> > > > > > giving
> > > > > > > >> end user
> > > > > > > >> more flexibility on trade-off between liveness and
> stability.
> > > > > > > >>
> > > > > > > >> 3) Could we just combine MEMBER_ID_MISMATCH and
> > > > > > DUPLICATE_STATIC_MEMBER
> > > > > > > >> into one error? It seems that these two errors are currently
> > > > handled
> > > > > > by
> > > > > > > >> the
> > > > > > > >> consumer in the same way. And we don't also don't expect
> > > > > > > >> MEMBER_ID_MISMATCH
> > > > > > > >> to happen. Thus it is not clear what is the benefit of
> having
> > > two
> > > > > > > errors.
> > > > > > > >> I agree that we should remove DUPLICATE_STATIC_MEMBER error
> > > > because
> > > > > > with
> > > > > > > >> the KIP-394<
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember%2Bid%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=OcUHb3tNzXVGxruhgofPDya5GpoJqFDfyq0jiIffZMY%3D&amp;reserved=0
> > > > > > > >> >
> > > > > > > >> we will automatically fence all join requests with
> > > > > UNKNOWN_MEMBER_ID.
> > > > > > > >>
> > > > > > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join
> > group
> > > > > > > contains
> > > > > > > >> member name which is already in the consumer group, however
> > the
> > > > > member
> > > > > > > id
> > > > > > > >> was missing". After a consumer is restarted, it will send a
> > > > > > > >> JoinGroupRequest with an existing memberName (as the
> > coordinator
> > > > has
> > > > > > not
> > > > > > > >> expired this member from the memory) and memberId
> > > > > > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not
> > > > > persisted
> > > > > > > >> across consumer restart in the consumer side). Does it mean
> > that
> > > > > > > >> JoinGroupRequest from a newly restarted consumer will always
> > be
> > > > > > rejected
> > > > > > > >> until the sessionTimeoutMs has passed?
> > > > > > > >> Same answer as question 3). This part of the logic shall be
> > > > removed
> > > > > > from
> > > > > > > >> the proposal.
> > > > > > > >>
> > > > > > > >> 5) It seems that we always add two methods to the interface
> > > > > > > >> org.apache.kafka.clients.admin.AdminClient.java, one with
> > > options
> > > > > and
> > > > > > > the
> > > > > > > >> other without option. Could this be specified in the
> interface
> > > > > change
> > > > > > > >> section?
> > > > > > > >> Sounds good! Added both methods.
> > > > > > > >>
> > > > > > > >> 6) Do we plan to have off-the-shelf command line tool for
> SRE
> > to
> > > > > > trigger
> > > > > > > >> rebalance? If so, we probably want to specify the command
> line
> > > > tool
> > > > > > > >> interface similar to
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=egpgN2DcSoFFLeOrGRv9EgtAuMLUxrvazXsUOIKWsGE%3D&amp;reserved=0
> > > > > > > >> .
> > > > > > > >> Added the script.
> > > > > > > >>
> > > > > > > >> 7) Would it be simpler to replace name
> "forceStaticRebalance"
> > > with
> > > > > > > >> "invokeConsumerRebalance"? It is not very clear what is the
> > > extra
> > > > > > > meaning
> > > > > > > >> of world "force" as compared to "trigger" or "invoke". And
> it
> > > > seems
> > > > > > > >> simpler
> > > > > > > >> to allows this API to trigger rebalance regardless of
> whether
> > > > > consumer
> > > > > > > is
> > > > > > > >> configured with memberName.
> > > > > > > >> Sounds good. Right now I feel for both static and dynamic
> > > > membership
> > > > > > it
> > > > > > > is
> > > > > > > >> more manageable to introduce the consumer rebalance method
> > > through
> > > > > > admin
> > > > > > > >> client API.
> > > > > > > >>
> > > > > > > >> 8) It is not very clear how the newly added AdminClient API
> > > > trigger
> > > > > > > >> rebalance. For example, does it send request? Can this be
> > > > explained
> > > > > in
> > > > > > > the
> > > > > > > >> KIP?
> > > > > > > >>
> > > > > > > >> Sure, I will add more details to the API.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Thanks again for the helpful suggestions!
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Best,
> > > > > > > >> Boyang
> > > > > > > >>
> > > > > > > >> ________________________________
> > > > > > > >> From: Dong Lin <lindon...@gmail.com>
> > > > > > > >> Sent: Saturday, November 24, 2018 2:54 PM
> > > > > > > >> To: dev
> > > > > > > >> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > > rebalances
> > > > > by
> > > > > > > >> specifying member id
> > > > > > > >>
> > > > > > > >> Hey Boyang,
> > > > > > > >>
> > > > > > > >> Thanks for the update! Here are some followup comments:
> > > > > > > >>
> > > > > > > >> 1) It is not very clear to the user what is the difference
> > > between
> > > > > > > >> member.name and client.id as both seems to be used to
> > identify
> > > > the
> > > > > > > >> consumer. I am wondering if it would be more intuitive to
> name
> > > it
> > > > > > > >> group.member.name (preferred choice since it matches the
> > > current
> > > > > > > group.id
> > > > > > > >> config name) or rebalance.member.name to explicitly show
> that
> > > the
> > > > > id
> > > > > > is
> > > > > > > >> solely used for rebalance.
> > > > > > > >>
> > > > > > > >> 2) In the interface change section it is said that
> > > > > > > >> GroupMaxSessionTimeoutMs
> > > > > > > >> will be changed to 30 minutes. It seems to suggest that we
> > will
> > > > > change
> > > > > > > the
> > > > > > > >> default value of this config. It does not seem necessary to
> > > > increase
> > > > > > the
> > > > > > > >> time of consumer failure detection when user doesn't use
> > static
> > > > > > > >> membership.
> > > > > > > >> Also, say static membership is enabled, then this default
> > config
> > > > > > change
> > > > > > > >> will cause a partition to be unavailable for consumption for
> > 30
> > > > > > minutes
> > > > > > > if
> > > > > > > >> there is hard consumer failure, which seems to be worse
> > > experience
> > > > > > than
> > > > > > > >> having unnecessary rebalance (when this timeout is small),
> > > > > > particularly
> > > > > > > >> for
> > > > > > > >> new users of Kafka. Could you explain more why we should
> make
> > > this
> > > > > > > change?
> > > > > > > >>
> > > > > > > >> 3) Could we just combine MEMBER_ID_MISMATCH and
> > > > > > DUPLICATE_STATIC_MEMBER
> > > > > > > >> into one error? It seems that these two errors are currently
> > > > handled
> > > > > > by
> > > > > > > >> the
> > > > > > > >> consumer in the same way. And we don't also don't expect
> > > > > > > >> MEMBER_ID_MISMATCH
> > > > > > > >> to happen. Thus it is not clear what is the benefit of
> having
> > > two
> > > > > > > errors.
> > > > > > > >>
> > > > > > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join
> > group
> > > > > > > contains
> > > > > > > >> member name which is already in the consumer group, however
> > the
> > > > > member
> > > > > > > id
> > > > > > > >> was missing". After a consumer is restarted, it will send a
> > > > > > > >> JoinGroupRequest with an existing memberName (as the
> > coordinator
> > > > has
> > > > > > not
> > > > > > > >> expired this member from the memory) and memberId
> > > > > > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not
> > > > > persisted
> > > > > > > >> across consumer restart in the consumer side). Does it mean
> > that
> > > > > > > >> JoinGroupRequest from a newly restarted consumer will always
> > be
> > > > > > rejected
> > > > > > > >> until the sessionTimeoutMs has passed?
> > > > > > > >>
> > > > > > > >> 5) It seems that we always add two methods to the interface
> > > > > > > >> org.apache.kafka.clients.admin.AdminClient.java, one with
> > > options
> > > > > and
> > > > > > > the
> > > > > > > >> other without option. Could this be specified in the
> interface
> > > > > change
> > > > > > > >> section?
> > > > > > > >>
> > > > > > > >> 6) Do we plan to have off-the-shelf command line tool for
> SRE
> > to
> > > > > > trigger
> > > > > > > >> rebalance? If so, we probably want to specify the command
> line
> > > > tool
> > > > > > > >> interface similar to
> > > > > > > >>
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=egpgN2DcSoFFLeOrGRv9EgtAuMLUxrvazXsUOIKWsGE%3D&amp;reserved=0
> > > > > > > >> .
> > > > > > > >>
> > > > > > > >> 7) Would it be simpler to replace name
> "forceStaticRebalance"
> > > with
> > > > > > > >> "invokeConsumerRebalance"? It is not very clear what is the
> > > extra
> > > > > > > meaning
> > > > > > > >> of world "force" as compared to "trigger" or "invoke". And
> it
> > > > seems
> > > > > > > >> simpler
> > > > > > > >> to allows this API to trigger rebalance regardless of
> whether
> > > > > consumer
> > > > > > > is
> > > > > > > >> configured with memberName.
> > > > > > > >>
> > > > > > > >> 8) It is not very clear how the newly added AdminClient API
> > > > trigger
> > > > > > > >> rebalance. For example, does it send request? Can this be
> > > > explained
> > > > > in
> > > > > > > the
> > > > > > > >> KIP?
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >> Dong
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen <
> > > bche...@outlook.com>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hey Mayuresh,
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > thanks for your feedbacks! I will try do another checklist
> > > here.
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > > By this you mean, even if the application has not called
> > > > > > > >> > > KafkaConsumer.poll() within session timeout, it will not
> > be
> > > > > > sending
> > > > > > > >> the
> > > > > > > >> > > LeaveGroup request, right?
> > > > > > > >> >
> > > > > > > >> > Yep it's true, we will prevent client from sending leave
> > group
> > > > > > request
> > > > > > > >> > when they are set with `member.name`.
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > > When is the member.name removed from this map?
> > > > > > > >> > Good question, we will only kick off member due to session
> > > > timeout
> > > > > > > >> within
> > > > > > > >> > static membership. Let me update the KIP to clearly assert
> > > that.
> > > > > > > >> >
> > > > > > > >> > > How is this case (missing member id) handled on the
> client
> > > > side?
> > > > > > > What
> > > > > > > >> is
> > > > > > > >> > the application that
> > > > > > > >> > > is using the KafkaConsumer suppose to do in this
> scenario?
> > > > > > > >> > I have extended the two exceptions within join group
> > response
> > > > V4.
> > > > > > > >> > Basically I define both corresponding actions to be
> > immediate
> > > > > > failing
> > > > > > > >> > client application, because so far it is unknown what kind
> > of
> > > > > client
> > > > > > > >> issue
> > > > > > > >> > could trigger them. After the first version, we will keep
> > > > enhance
> > > > > > the
> > > > > > > >> error
> > > > > > > >> > handling logic!
> > > > > > > >> >
> > > > > > > >> > > This would mean that it might take more time to detect
> > > unowned
> > > > > > topic
> > > > > > > >> > > partitions and may cause delay for applications that
> > perform
> > > > > data
> > > > > > > >> > mirroring
> > > > > > > >> > > tasks. I discussed this with our sre and we have a
> > > suggestion
> > > > to
> > > > > > > make
> > > > > > > >> > here
> > > > > > > >> > > as listed below separately.
> > > > > > > >> > The goal of extending session timeout cap is for users
> with
> > > good
> > > > > > > client
> > > > > > > >> > side monitoring tools that could auto-heal the dead
> > consumers
> > > > very
> > > > > > > >> fast. So
> > > > > > > >> > it is optional (and personal) to extend session timeout
> to a
> > > > > > > reasonable
> > > > > > > >> > number with different client scenarios.
> > > > > > > >> >
> > > > > > > >> > > you meant remove unjoined members of the group, right ?
> > > > > > > >> > Yep, there is a typo. Thanks for catching this!
> > > > > > > >> >
> > > > > > > >> > > What do you mean by " Internally we would optimize this
> > > logic
> > > > by
> > > > > > > >> having
> > > > > > > >> > > rebalance timeout only in charge of stopping prepare
> > > rebalance
> > > > > > > stage,
> > > > > > > >> > > without removing non-responsive members immediately."
> > There
> > > > > would
> > > > > > > not
> > > > > > > >> be
> > > > > > > >> > a
> > > > > > > >> > > full rebalance if the lagging consumer sent a JoinGroup
> > > > request
> > > > > > > later,
> > > > > > > >> > > right ? If yes, can you highlight this in the KIP ?
> > > > > > > >> > No, there won't be. We want to limit the rebalance timeout
> > > > > > > functionality
> > > > > > > >> > to only use as a timer to
> > > > > > > >> > end prepare rebalance stage. This way, late joining static
> > > > members
> > > > > > > will
> > > > > > > >> > not trigger further rebalance
> > > > > > > >> > as long as they are within session timeout. I added your
> > > > highlight
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > KIP!
> > > > > > > >> >
> > > > > > > >> > > The KIP talks about scale up scenario but its not quite
> > > clear
> > > > > how
> > > > > > we
> > > > > > > >> > > handle it. Are we adding a separate "expansion.timeout"
> or
> > > we
> > > > > > adding
> > > > > > > >> > status
> > > > > > > >> > > "learner" ?. Can you shed more light on how this is
> > handled
> > > in
> > > > > the
> > > > > > > >> KIP,
> > > > > > > >> > if
> > > > > > > >> > > its handled?
> > > > > > > >> > Updated the KIP: we shall not cover scale up case in 345,
> > > > because
> > > > > we
> > > > > > > >> > believe client side could
> > > > > > > >> > better handle this logic.
> > > > > > > >> >
> > > > > > > >> > > I think Jason had brought this up earlier about having a
> > way
> > > > to
> > > > > > say
> > > > > > > >> how
> > > > > > > >> > > many members/consumer hosts are you choosing to be in
> the
> > > > > consumer
> > > > > > > >> group.
> > > > > > > >> > > If we can do this, then in case of mirroring
> applications
> > we
> > > > can
> > > > > > do
> > > > > > > >> this
> > > > > > > >> > :
> > > > > > > >> > > Lets say we have a mirroring application that consumes
> > from
> > > > > Kafka
> > > > > > > >> cluster
> > > > > > > >> > > A and produces to Kafka cluster B.
> > > > > > > >> > > Depending on the data and the Kafka cluster
> configuration,
> > > > Kafka
> > > > > > > >> service
> > > > > > > >> > > providers can set a mirroring group saying that it will
> > > take,
> > > > > for
> > > > > > > >> example
> > > > > > > >> > > 300 consumer hosts/members to achieve the desired
> > throughput
> > > > and
> > > > > > > >> latency
> > > > > > > >> > > for mirroring and can have additional 10 consumer hosts
> as
> > > > spare
> > > > > > in
> > > > > > > >> the
> > > > > > > >> > > same group.
> > > > > > > >> > > So when the first 300 members/consumers to join the
> group
> > > will
> > > > > > start
> > > > > > > >> > > mirroring the data from Kafka cluster A to Kafka cluster
> > B.
> > > > > > > >> > > The remaining 10 consumer members can sit idle.
> > > > > > > >> > > The moment one of the consumer (for example: consumer
> > number
> > > > 54)
> > > > > > > from
> > > > > > > >> the
> > > > > > > >> > > first 300 members go out of the group (crossed session
> > > > timeout),
> > > > > > it
> > > > > > > >> (the
> > > > > > > >> > > groupCoordinator) can just assign the topicPartitions
> from
> > > the
> > > > > > > >> consumer
> > > > > > > >> > > member 54 to one of the spare hosts.
> > > > > > > >> > > Once the consumer member 54 comes back up, it can start
> as
> > > > > being a
> > > > > > > >> part
> > > > > > > >> > of
> > > > > > > >> > > the spare pool.
> > > > > > > >> > > This enables us to have lower session timeouts and low
> > > latency
> > > > > > > >> mirroring,
> > > > > > > >> > > in cases where the service providers are OK with having
> > > spare
> > > > > > hosts.
> > > > > > > >> > > This would mean that we would tolerate n consumer
> members
> > > > > leaving
> > > > > > > and
> > > > > > > >> > > rejoining the group and still provide low latency as
> long
> > > as n
> > > > > <=
> > > > > > > >> number
> > > > > > > >> > of
> > > > > > > >> > > spare consumers.
> > > > > > > >> > > If there are no spare host available, we can get back to
> > the
> > > > > idea
> > > > > > as
> > > > > > > >> > > described in the KIP.
> > > > > > > >> > Great idea! In fact on top of static membership we could
> > later
> > > > > > > introduce
> > > > > > > >> > APIs to set hard-coded
> > > > > > > >> > client ids to the group and replace the dead host, or as
> you
> > > > > > proposed
> > > > > > > to
> > > > > > > >> > define spare host as
> > > > > > > >> > what I understood as hot backup. I will put both Jason and
> > > your
> > > > > > > >> > suggestions into a separate section
> > > > > > > >> > called "Future works". Note that this spare host idea may
> be
> > > > also
> > > > > > > >> solvable
> > > > > > > >> > through rebalance protocol
> > > > > > > >> > IMO.
> > > > > > > >> >
> > > > > > > >> > Thank you again for the great feedback!
> > > > > > > >> >
> > > > > > > >> > Boyang
> > > > > > > >> > ________________________________
> > > > > > > >> > From: Boyang Chen <bche...@outlook.com>
> > > > > > > >> > Sent: Thursday, November 22, 2018 3:39 PM
> > > > > > > >> > To: dev@kafka.apache.org
> > > > > > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > > > rebalances
> > > > > > by
> > > > > > > >> > specifying member id
> > > > > > > >> >
> > > > > > > >> > Hey Dong, sorry for missing your message. I couldn't find
> > your
> > > > > email
> > > > > > > on
> > > > > > > >> my
> > > > > > > >> > thread, so I will just do a checklist here!
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > 1) The motivation currently explicitly states that the
> goal
> > is
> > > > to
> > > > > > > >> improve
> > > > > > > >> >
> > > > > > > >> > performance for heavy state application. It seems that the
> > > > > > motivation
> > > > > > > >> can
> > > > > > > >> >
> > > > > > > >> > be stronger with the following use-case. Currently for
> > > > MirrorMaker
> > > > > > > >> cluster
> > > > > > > >> >
> > > > > > > >> > with e.g. 100 MirrorMaker processes, it will take a long
> > time
> > > to
> > > > > > > rolling
> > > > > > > >> >
> > > > > > > >> > bounce the entire MirrorMaker cluster. Each MirrorMaker
> > > process
> > > > > > > restart
> > > > > > > >> >
> > > > > > > >> > will trigger a rebalance which currently pause the
> > consumption
> > > > of
> > > > > > the
> > > > > > > >> all
> > > > > > > >> >
> > > > > > > >> > partitions of the MirrorMaker cluster. With the change
> > stated
> > > in
> > > > > > this
> > > > > > > >> >
> > > > > > > >> > patch, as long as a MirrorMaker can restart within the
> > > specified
> > > > > > > timeout
> > > > > > > >> >
> > > > > > > >> > (e.g. 2 minutes), then we only need constant number of
> > > rebalance
> > > > > > (e.g.
> > > > > > > >> for
> > > > > > > >> >
> > > > > > > >> > leader restart) for the entire rolling bounce, which will
> > > > > > > significantly
> > > > > > > >> >
> > > > > > > >> > improves the availability of the MirrorMaker pipeline. In
> my
> > > > > > opinion,
> > > > > > > >> the
> > > > > > > >> >
> > > > > > > >> > main benefit of the KIP is to avoid unnecessary rebalance
> if
> > > the
> > > > > > > >> consumer
> > > > > > > >> >
> > > > > > > >> > process can be restarted within soon, which helps
> > performance
> > > > even
> > > > > > if
> > > > > > > >> >
> > > > > > > >> > overhead of state shuffling for a given process is small.
> > > > > > > >> >
> > > > > > > >> > I just rephrased this part and added it to the KIP. Thanks
> > for
> > > > > > making
> > > > > > > >> the
> > > > > > > >> > motivation more solid!
> > > > > > > >> >
> > > > > > > >> > 2) In order to simplify the KIP reading, can you follow
> the
> > > > > writeup
> > > > > > > >> style
> > > > > > > >> > of other KIP (e.g. KIP-98) and list the interface change
> > such
> > > as
> > > > > new
> > > > > > > >> > configs (e.g. registration timeout), new request/response,
> > new
> > > > > > > >> AdminClient
> > > > > > > >> > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)?
> > > Currently
> > > > > > some
> > > > > > > of
> > > > > > > >> > these are specified in the Proposed Change section which
> > makes
> > > > it
> > > > > a
> > > > > > > bit
> > > > > > > >> > inconvenient to understand the new interface that will be
> > > > exposed
> > > > > to
> > > > > > > >> user.
> > > > > > > >> > Explanation of the current two-phase rebalance protocol
> > > probably
> > > > > can
> > > > > > > be
> > > > > > > >> > moved out of public interface section.
> > > > > > > >> > This is a great suggestion! I just consolidated all the
> > public
> > > > API
> > > > > > > >> > changes, and the whole KIP
> > > > > > > >> > looks much more organized!
> > > > > > > >> >
> > > > > > > >> > 3) There are currently two version of JoinGroupRequest in
> > the
> > > > KIP
> > > > > > and
> > > > > > > >> only
> > > > > > > >> > one of them has field memberId. This seems confusing.
> > > > > > > >> > Yep, I already found this issue and fixed it.
> > > > > > > >> >
> > > > > > > >> > 4) It is mentioned in the KIP that "An admin API to force
> > > > > rebalance
> > > > > > > >> could
> > > > > > > >> > be helpful here, but we will make a call once we finished
> > the
> > > > > major
> > > > > > > >> > implementation". So this seems to be still an open
> question
> > in
> > > > the
> > > > > > > >> current
> > > > > > > >> > design. We probably want to agree on this before voting
> for
> > > the
> > > > > KIP.
> > > > > > > >> > We have finalized the idea that this API is needed.
> > > > > > > >> >
> > > > > > > >> > 5) The KIP currently adds new config MEMBER_NAME for
> > consumer.
> > > > Can
> > > > > > you
> > > > > > > >> > specify the name of the config key and the default config
> > > value?
> > > > > > > >> Possible
> > > > > > > >> > default values include empty string or null (similar to
> > > > > > > transaction.id<
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=6Z0ODRuDX0I6g1QsYY54M%2BBypx3tuJ8fIvPSOLCBvcM%3D&amp;reserved=0
> > > > > > > >> >
> > > > > > > >> > in
> > > > > > > >> > producer config).
> > > > > > > >> > I have defined the `member.name` in "New configuration"
> > > > section.
> > > > > > > >> >
> > > > > > > >> > 6) Regarding the use of the topic "static_member_map" to
> > > persist
> > > > > > > member
> > > > > > > >> > name map, currently if consumer coordinator broker goes
> > > offline,
> > > > > > > >> rebalance
> > > > > > > >> > is triggered and consumers will try connect to the new
> > > > > coordinator.
> > > > > > If
> > > > > > > >> > these consumers can connect to the new coordinator within
> > > > > > > >> > max.poll.interval.ms<
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=pwFb%2BRlHT3zteC318DrffUnPNCgucXIcFnsdtEl22BE%3D&amp;reserved=0
> > > > > > > >> >
> > > > > > > >> > which by default is 5 minutes, given that broker can
> > > > > > > >> > use a deterministic algorithm to determine the partition
> ->
> > > > > > > member_name
> > > > > > > >> > mapping, each consumer should get assigned the same set of
> > > > > > partitions
> > > > > > > >> > without requiring state shuffling. So it is not clear
> > whether
> > > we
> > > > > > have
> > > > > > > a
> > > > > > > >> > strong use-case for this new logic. Can you help clarify
> > what
> > > is
> > > > > the
> > > > > > > >> > benefit of using topic "static_member_map" to persist
> member
> > > > name
> > > > > > map?
> > > > > > > >> > I have discussed with Guozhang offline, and I believe
> > reusing
> > > > the
> > > > > > > >> current
> > > > > > > >> > `_consumer_offsets`
> > > > > > > >> > topic is a better and unified solution.
> > > > > > > >> >
> > > > > > > >> > 7) Regarding the introduction of the expensionTimeoutMs
> > > config,
> > > > it
> > > > > > is
> > > > > > > >> > mentioned that "we are using expansion timeout to replace
> > > > > rebalance
> > > > > > > >> > timeout, which is configured by max.poll.intervals from
> > client
> > > > > side,
> > > > > > > and
> > > > > > > >> > using registration timeout to replace session timeout".
> > > > Currently
> > > > > > the
> > > > > > > >> > default max.poll.interval.ms<
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=pwFb%2BRlHT3zteC318DrffUnPNCgucXIcFnsdtEl22BE%3D&amp;reserved=0
> > > > > > > >> >
> > > > > > > >> > is configured to be 5 minutes and there will
> > > > > > > >> > be only one rebalance if all new consumers can join
> within 5
> > > > > > minutes.
> > > > > > > >> So it
> > > > > > > >> > is not clear whether we have a strong use-case for this
> new
> > > > > config.
> > > > > > > Can
> > > > > > > >> you
> > > > > > > >> > explain what is the benefit of introducing this new
> config?
> > > > > > > >> > Previously our goal is to use expansion timeout as a
> > > workaround
> > > > > for
> > > > > > > >> > triggering multiple
> > > > > > > >> > rebalances when scaling up members are not joining at the
> > same
> > > > > time.
> > > > > > > It
> > > > > > > >> is
> > > > > > > >> > decided to
> > > > > > > >> > be addressed by client side protocol change, so we will
> not
> > > > > > introduce
> > > > > > > >> > expansion timeout.
> > > > > > > >> >
> > > > > > > >> > 8) It is mentioned that "To distinguish between previous
> > > version
> > > > > of
> > > > > > > >> > protocol, we will also increase the join group request
> > version
> > > > to
> > > > > v4
> > > > > > > >> when
> > > > > > > >> > MEMBER_NAME is set" and "If the broker version is not the
> > > latest
> > > > > (<
> > > > > > > v4),
> > > > > > > >> > the join group request shall be downgraded to v3 without
> > > setting
> > > > > the
> > > > > > > >> member
> > > > > > > >> > Id". It is probably simpler to just say that this feature
> is
> > > > > enabled
> > > > > > > if
> > > > > > > >> > JoinGroupRequest V4 is supported on both client and broker
> > and
> > > > > > > >> MEMBER_NAME
> > > > > > > >> > is configured with non-empty string.
> > > > > > > >> > Yep, addressed this!
> > > > > > > >> >
> > > > > > > >> > 9) It is mentioned that broker may return
> > > > > NO_STATIC_MEMBER_INFO_SET
> > > > > > > >> error
> > > > > > > >> > in OffsetCommitResponse for "commit requests under static
> > > > > > membership".
> > > > > > > >> Can
> > > > > > > >> > you clarify how broker determines whether the commit
> request
> > > is
> > > > > > under
> > > > > > > >> > static membership?
> > > > > > > >> >
> > > > > > > >> > We have agreed that commit request shouldn't be affected
> by
> > > the
> > > > > new
> > > > > > > >> > membership, thus
> > > > > > > >> > removing it here. Thanks for catching this!
> > > > > > > >> >
> > > > > > > >> > Let me know if you have further suggestions or concerns.
> > Thank
> > > > you
> > > > > > for
> > > > > > > >> > your valuable feedback
> > > > > > > >> > to help me design the KIP better! (And I will try to
> address
> > > > your
> > > > > > > >> > feedbacks in next round Mayuresh ??)
> > > > > > > >> >
> > > > > > > >> > Best,
> > > > > > > >> > Boyang
> > > > > > > >> > ________________________________
> > > > > > > >> > From: Mayuresh Gharat <gharatmayures...@gmail.com>
> > > > > > > >> > Sent: Wednesday, November 21, 2018 7:50 AM
> > > > > > > >> > To: dev@kafka.apache.org
> > > > > > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > > > rebalances
> > > > > > by
> > > > > > > >> > specifying member id
> > > > > > > >> >
> > > > > > > >> > Hi Boyang,
> > > > > > > >> >
> > > > > > > >> > Thanks for updating the KIP. This is a step good direction
> > for
> > > > > > > stateful
> > > > > > > >> > applications and also mirroring applications whose latency
> > is
> > > > > > affected
> > > > > > > >> due
> > > > > > > >> > to the rebalance issues that we have today.
> > > > > > > >> >
> > > > > > > >> > I had a few questions on the current version of the KIP :
> > > > > > > >> > For the effectiveness of the KIP, consumer with
> member.name
> > > set
> > > > > > will
> > > > > > > >> *not
> > > > > > > >> > send leave group request* when they go offline
> > > > > > > >> >
> > > > > > > >> > > By this you mean, even if the application has not called
> > > > > > > >> > > KafkaConsumer.poll() within session timeout, it will not
> > be
> > > > > > sending
> > > > > > > >> the
> > > > > > > >> > > LeaveGroup request, right?
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Broker will maintain an in-memory mapping of {member.name
> ?
> > > > > > member.id
> > > > > > > }
> > > > > > > >> to
> > > > > > > >> > track member uniqueness.
> > > > > > > >> >
> > > > > > > >> > > When is the member.name removed from this map?
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Member.id must be set if the *member.name <
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=1zOe9ZqIHB7lvt8XJt2jtLvRaP75G3OamvtLRLjysyo%3D&amp;reserved=0
> > > > > > > >> >
> > > > > > > >> > *is already
> > > > > > > >> > within the map. Otherwise reply MISSING_MEMBER_ID
> > > > > > > >> >
> > > > > > > >> > > How is this case handled on the client side? What is the
> > > > > > application
> > > > > > > >> that
> > > > > > > >> > > is using the KafkaConsumer suppose to do in this
> scenario?
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Session timeout is the timeout we will trigger rebalance
> > when
> > > a
> > > > > > member
> > > > > > > >> goes
> > > > > > > >> > offline for too long (not sending heartbeat request). To
> > make
> > > > > static
> > > > > > > >> > membership effective, we should increase the default max
> > > session
> > > > > > > >> timeout to
> > > > > > > >> > 30 min so that end user could config it freely.
> > > > > > > >> >
> > > > > > > >> > > This would mean that it might take more time to detect
> > > unowned
> > > > > > topic
> > > > > > > >> > > partitions and may cause delay for applications that
> > perform
> > > > > data
> > > > > > > >> > mirroring
> > > > > > > >> > > tasks. I discussed this with our sre and we have a
> > > suggestion
> > > > to
> > > > > > > make
> > > > > > > >> > here
> > > > > > > >> > > as listed below separately.
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Currently there is a config called *rebalance timeout*
> which
> > > is
> > > > > > > >> configured
> > > > > > > >> > by consumer *max.poll.intervals*. The reason we set it to
> > poll
> > > > > > > interval
> > > > > > > >> is
> > > > > > > >> > because consumer could only send request within the call
> of
> > > > poll()
> > > > > > and
> > > > > > > >> we
> > > > > > > >> > want to wait sufficient time for the join group request.
> > When
> > > > > > reaching
> > > > > > > >> > rebalance timeout, the group will move towards
> > > > completingRebalance
> > > > > > > stage
> > > > > > > >> > and remove unjoined groups
> > > > > > > >> >
> > > > > > > >> > > you meant remove unjoined members of the group, right ?
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Currently there is a config called *rebalance timeout*
> which
> > > is
> > > > > > > >> configured
> > > > > > > >> > by consumer *max.poll.intervals*. The reason we set it to
> > poll
> > > > > > > interval
> > > > > > > >> is
> > > > > > > >> > because consumer could only send request within the call
> of
> > > > poll()
> > > > > > and
> > > > > > > >> we
> > > > > > > >> > want to wait sufficient time for the join group request.
> > When
> > > > > > reaching
> > > > > > > >> > rebalance timeout, the group will move towards
> > > > completingRebalance
> > > > > > > stage
> > > > > > > >> > and remove unjoined groups. This is actually conflicting
> > with
> > > > the
> > > > > > > >> design of
> > > > > > > >> > static membership, because those temporarily unavailable
> > > members
> > > > > > will
> > > > > > > >> > potentially reattempt the join group and trigger extra
> > > > rebalances.
> > > > > > > >> > Internally we would optimize this logic by having
> rebalance
> > > > > timeout
> > > > > > > >> only in
> > > > > > > >> > charge of stopping prepare rebalance stage, without
> removing
> > > > > > > >> non-responsive
> > > > > > > >> > members immediately.
> > > > > > > >> >
> > > > > > > >> > > What do you mean by " Internally we would optimize this
> > > logic
> > > > by
> > > > > > > >> having
> > > > > > > >> > > rebalance timeout only in charge of stopping prepare
> > > rebalance
> > > > > > > stage,
> > > > > > > >> > > without removing non-responsive members immediately."
> > There
> > > > > would
> > > > > > > not
> > > > > > > >> be
> > > > > > > >> > a
> > > > > > > >> > > full rebalance if the lagging consumer sent a JoinGroup
> > > > request
> > > > > > > later,
> > > > > > > >> > > right ? If yes, can you highlight this in the KIP ?
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Scale Up
> > > > > > > >> >
> > > > > > > >> > > The KIP talks about scale up scenario but its not quite
> > > clear
> > > > > how
> > > > > > we
> > > > > > > >> > > handle it. Are we adding a separate "expansion.timeout"
> or
> > > we
> > > > > > adding
> > > > > > > >> > status
> > > > > > > >> > > "learner" ?. Can you shed more light on how this is
> > handled
> > > in
> > > > > the
> > > > > > > >> KIP,
> > > > > > > >> > if
> > > > > > > >> > > its handled?
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > *Discussion*
> > > > > > > >> > Larger session timeouts causing latency rise for getting
> > data
> > > > for
> > > > > > > >> un-owned
> > > > > > > >> > topic partitions :
> > > > > > > >> >
> > > > > > > >> > > I think Jason had brought this up earlier about having a
> > way
> > > > to
> > > > > > say
> > > > > > > >> how
> > > > > > > >> > > many members/consumer hosts are you choosing to be in
> the
> > > > > consumer
> > > > > > > >> group.
> > > > > > > >> > > If we can do this, then in case of mirroring
> applications
> > we
> > > > can
> > > > > > do
> > > > > > > >> this
> > > > > > > >> > :
> > > > > > > >> > > Lets say we have a mirroring application that consumes
> > from
> > > > > Kafka
> > > > > > > >> cluster
> > > > > > > >> > > A and produces to Kafka cluster B.
> > > > > > > >> > > Depending on the data and the Kafka cluster
> configuration,
> > > > Kafka
> > > > > > > >> service
> > > > > > > >> > > providers can set a mirroring group saying that it will
> > > take,
> > > > > for
> > > > > > > >> example
> > > > > > > >> > > 300 consumer hosts/members to achieve the desired
> > throughput
> > > > and
> > > > > > > >> latency
> > > > > > > >> > > for mirroring and can have additional 10 consumer hosts
> as
> > > > spare
> > > > > > in
> > > > > > > >> the
> > > > > > > >> > > same group.
> > > > > > > >> > > So when the first 300 members/consumers to join the
> group
> > > will
> > > > > > start
> > > > > > > >> > > mirroring the data from Kafka cluster A to Kafka cluster
> > B.
> > > > > > > >> > > The remaining 10 consumer members can sit idle.
> > > > > > > >> > > The moment one of the consumer (for example: consumer
> > number
> > > > 54)
> > > > > > > from
> > > > > > > >> the
> > > > > > > >> > > first 300 members go out of the group (crossed session
> > > > timeout),
> > > > > > it
> > > > > > > >> (the
> > > > > > > >> > > groupCoordinator) can just assign the topicPartitions
> from
> > > the
> > > > > > > >> consumer
> > > > > > > >> > > member 54 to one of the spare hosts.
> > > > > > > >> > > Once the consumer member 54 comes back up, it can start
> as
> > > > > being a
> > > > > > > >> part
> > > > > > > >> > of
> > > > > > > >> > > the spare pool.
> > > > > > > >> > > This enables us to have lower session timeouts and low
> > > latency
> > > > > > > >> mirroring,
> > > > > > > >> > > in cases where the service providers are OK with having
> > > spare
> > > > > > hosts.
> > > > > > > >> > > This would mean that we would tolerate n consumer
> members
> > > > > leaving
> > > > > > > and
> > > > > > > >> > > rejoining the group and still provide low latency as
> long
> > > as n
> > > > > <=
> > > > > > > >> number
> > > > > > > >> > of
> > > > > > > >> > > spare consumers.
> > > > > > > >> > > If there are no spare host available, we can get back to
> > the
> > > > > idea
> > > > > > as
> > > > > > > >> > > described in the KIP.
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> > Thanks,
> > > > > > > >> >
> > > > > > > >> > Mayuresh
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis <
> > > > > > > >> > konstant...@confluent.io> wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi Boyang.
> > > > > > > >> > >
> > > > > > > >> > > Thanks for preparing this KIP! It is making good
> progress
> > > and
> > > > > will
> > > > > > > be
> > > > > > > >> a
> > > > > > > >> > > great improvement for stateful Kafka applications.
> > > > > > > >> > >
> > > > > > > >> > > Apologies for my late reply, I was away for a while.
> Lots
> > of
> > > > > great
> > > > > > > >> > comments
> > > > > > > >> > > so far, so I'll probably second most of them in what I
> > > suggest
> > > > > > below
> > > > > > > >> at
> > > > > > > >> > > this point.
> > > > > > > >> > >
> > > > > > > >> > > When I first read the KIP, I wanted to start at the end
> > with
> > > > > > > something
> > > > > > > >> > that
> > > > > > > >> > > wasn't highlighted a lot. That was the topic related to
> > > > handling
> > > > > > > >> > duplicate
> > > > > > > >> > > members. I see now that the initial suggestion of
> handling
> > > > this
> > > > > > > >> situation
> > > > > > > >> > > during offset commit has been removed, and I agree with
> > > that.
> > > > > > Issues
> > > > > > > >> > > related to membership seem to be handled better when the
> > > > member
> > > > > > > joins
> > > > > > > >> the
> > > > > > > >> > > group rather than when it tries to commit offsets. This
> > also
> > > > > > > >> simplifies
> > > > > > > >> > how
> > > > > > > >> > > many request types need to change in order to
> incorporate
> > > the
> > > > > new
> > > > > > > >> member
> > > > > > > >> > > name field.
> > > > > > > >> > >
> > > > > > > >> > > I also agree with what Jason and Guozhang have said
> > > regarding
> > > > > > > >> timeouts.
> > > > > > > >> > > Although semantically, it's easier to think of every
> > > operation
> > > > > > > having
> > > > > > > >> its
> > > > > > > >> > > own timeout, operationally this can become a burden.
> Thus,
> > > > > > > >> consolidation
> > > > > > > >> > > seems preferable here. The definition of embedded
> > protocols
> > > on
> > > > > top
> > > > > > > of
> > > > > > > >> the
> > > > > > > >> > > base group membership protocol for rebalancing gives
> > enough
> > > > > > > >> flexibility
> > > > > > > >> > to
> > > > > > > >> > > address such needs in each client component separately.
> > > > > > > >> > >
> > > > > > > >> > > Finally, some minor comments:
> > > > > > > >> > > In a few places the new/proposed changes are referred to
> > as
> > > > > > > "current".
> > > > > > > >> > > Which is a bit confusing considering that there is a
> > > protocol
> > > > in
> > > > > > > place
> > > > > > > >> > > already, and by "current" someone might understand the
> > > > existing
> > > > > > one.
> > > > > > > >> I'd
> > > > > > > >> > > recommend using new/proposed or equivalent when
> referring
> > to
> > > > > > changes
> > > > > > > >> > > introduced with KIP-345 and current/existing or
> equivalent
> > > > when
> > > > > > > >> referring
> > > > > > > >> > > to existing behavior.
> > > > > > > >> > >
> > > > > > > >> > > There's the following sentence in the "Public
> Interfaces"
> > > > > section:
> > > > > > > >> > > "Since for many stateful consumer/stream applications,
> the
> > > > state
> > > > > > > >> > shuffling
> > > > > > > >> > > is more painful than short time partial unavailability."
> > > > > > > >> > > However, my understanding is that the changes proposed
> > with
> > > > > > KIP-345
> > > > > > > >> will
> > > > > > > >> > > not exploit any partial availability. A suggestion for
> > > dealing
> > > > > > with
> > > > > > > >> > > temporary imbalances has been made in "Incremental
> > > Cooperative
> > > > > > > >> > Rebalancing"
> > > > > > > >> > > which can work well with KIP-345, but here I don't see
> > > > proposed
> > > > > > > >> changes
> > > > > > > >> > > that suggest that some resources (e.g. partitions) will
> > keep
> > > > > being
> > > > > > > >> used
> > > > > > > >> > > while others will not be utilized. Thus, you might want
> to
> > > > > adjust
> > > > > > > this
> > > > > > > >> > > sentence. Correct me if I'm missing something related to
> > > that.
> > > > > > > >> > >
> > > > > > > >> > > In the rejected alternatives, under point 2) I read "we
> > can
> > > > copy
> > > > > > the
> > > > > > > >> > member
> > > > > > > >> > > id to the config files". I believe it means to say
> "member
> > > > name"
> > > > > > > >> unless
> > > > > > > >> > I'm
> > > > > > > >> > > missing something about reusing member ids. Also below I
> > > read:
> > > > > "By
> > > > > > > >> > allowing
> > > > > > > >> > > consumers to optionally specifying a member id" which
> > > probably
> > > > > > > implies
> > > > > > > >> > > "member name" again. In a sense this section highlights
> a
> > > > > > potential
> > > > > > > >> > > confusion between member name and member id. I wonder if
> > we
> > > > > could
> > > > > > > >> come up
> > > > > > > >> > > with a better term for the new field. StaticTag,
> > > StaticLabel,
> > > > or
> > > > > > > even
> > > > > > > >> > > StaticName are some suggestions that could potentially
> > help
> > > > with
> > > > > > > >> > confusion
> > > > > > > >> > > between MemberId and MemberName and what corresponds to
> > > what.
> > > > > But
> > > > > > I
> > > > > > > >> > > wouldn't like to disrupt the discussion with naming
> > > > conventions
> > > > > > too
> > > > > > > >> much
> > > > > > > >> > at
> > > > > > > >> > > this point. I just mention it here as a thought.
> > > > > > > >> > >
> > > > > > > >> > > Looking forward to see the final details of this KIP.
> > Great
> > > > work
> > > > > > so
> > > > > > > >> far!
> > > > > > > >> > >
> > > > > > > >> > > Konstantine
> > > > > > > >> > >
> > > > > > > >> > >
> > > > > > > >> > > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <
> > > > > bche...@outlook.com>
> > > > > > > >> wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Thanks Guozhang for the great summary here, and I have
> > > been
> > > > > > > >> following
> > > > > > > >> > up
> > > > > > > >> > > > the action items here.
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >   1.  I already updated the KIP to remove the
> expansion
> > > > > timeout
> > > > > > > and
> > > > > > > >> > > > registration timeout. Great to see them being
> addressed
> > in
> > > > > > client
> > > > > > > >> side!
> > > > > > > >> > > >   2.  I double checked the design and I believe that
> it
> > is
> > > > ok
> > > > > to
> > > > > > > >> have
> > > > > > > >> > > both
> > > > > > > >> > > > static member and dynamic member co-exist in the same
> > > group.
> > > > > So
> > > > > > > the
> > > > > > > >> > > upgrade
> > > > > > > >> > > > shouldn't be destructive and we are removing the two
> > > > > membership
> > > > > > > >> > protocol
> > > > > > > >> > > > switching APIs.
> > > > > > > >> > > >   3.  I only have question about this one. I'm still
> > > reading
> > > > > the
> > > > > > > >> > > KafkaApis
> > > > > > > >> > > > code here. Should I just use the same authorization
> > logic
> > > > for
> > > > > > > >> > > > ForceStaticRebalanceRequest as JoinGroupRequest?
> > > > > > > >> > > >   4.  I'm very excited to see this work with K8! Like
> > you
> > > > > > > suggested,
> > > > > > > >> > this
> > > > > > > >> > > > feature could be better addressed in a separate KIP
> > > because
> > > > it
> > > > > > is
> > > > > > > >> > pretty
> > > > > > > >> > > > independent. I could start drafting the KIP once the
> > > current
> > > > > > > >> proposal
> > > > > > > >> > is
> > > > > > > >> > > > approved.
> > > > > > > >> > > >   5.  I believe that we don't need fencing in offset
> > > commit
> > > > > > > request,
> > > > > > > >> > > since
> > > > > > > >> > > > duplicate member.name issue could be handled by join
> > > group
> > > > > > > >> request. We
> > > > > > > >> > > > shall reject join group with known member name but no
> > > member
> > > > > id
> > > > > > > >> (which
> > > > > > > >> > > > means we already have an active member using this
> > > identity).
> > > > > > > >> > > >   6.  I agree to remove that internal config once we
> > move
> > > > > > forward
> > > > > > > >> with
> > > > > > > >> > > > static membership. And I already removed the entire
> > > section
> > > > > from
> > > > > > > the
> > > > > > > >> > KIP.
> > > > > > > >> > > >
> > > > > > > >> > > > Let me know if you have other concerns.
> > > > > > > >> > > >
> > > > > > > >> > > > Best,
> > > > > > > >> > > > Boyang
> > > > > > > >> > > > ________________________________
> > > > > > > >> > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > > > > >> > > > Sent: Tuesday, November 20, 2018 4:21 PM
> > > > > > > >> > > > To: dev
> > > > > > > >> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple
> consumer
> > > > > > > rebalances
> > > > > > > >> by
> > > > > > > >> > > > specifying member id
> > > > > > > >> > > >
> > > > > > > >> > > > Hello Boyang,
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks a lot for the KIP! It is a great write-up and I
> > > > > > appreciate
> > > > > > > >> your
> > > > > > > >> > > > patience answering to the feedbacks from the
> community.
> > > I'd
> > > > > like
> > > > > > > to
> > > > > > > >> add
> > > > > > > >> > > my
> > > > > > > >> > > > 2cents here:
> > > > > > > >> > > >
> > > > > > > >> > > > 1. By introducing another two timeout configs,
> > > > > > > registration_timeout
> > > > > > > >> and
> > > > > > > >> > > > expansion_timeout, we are effectively having four
> > timeout
> > > > > > configs:
> > > > > > > >> > > session
> > > > > > > >> > > > timeout, rebalance timeout (configured as "
> > > > > max.poll.interval.ms
> > > > > > "
> > > > > > > on
> > > > > > > >> > > client
> > > > > > > >> > > > side), and these two. Interplaying these timeout
> configs
> > > can
> > > > > be
> > > > > > > >> quite
> > > > > > > >> > > hard
> > > > > > > >> > > > for users with such complexity, and hence I'm
> wondering
> > if
> > > > we
> > > > > > can
> > > > > > > >> > > simplify
> > > > > > > >> > > > the situation with as less possible timeout configs as
> > > > > possible.
> > > > > > > >> Here
> > > > > > > >> > is
> > > > > > > >> > > a
> > > > > > > >> > > > concrete suggestion I'd like propose:
> > > > > > > >> > > >
> > > > > > > >> > > > 1.a) Instead of introducing a registration_timeout in
> > > > addition
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > > > session_timeout for static members, we can just reuse
> > the
> > > > > > > >> > session_timeout
> > > > > > > >> > > > and ask users to set it to a larger value when they
> are
> > > > > > upgrading
> > > > > > > a
> > > > > > > >> > > dynamic
> > > > > > > >> > > > client to a static client by setting the "member.name
> "
> > at
> > > > the
> > > > > > > same
> > > > > > > >> > time.
> > > > > > > >> > > > By
> > > > > > > >> > > > default, the broker-side min.session.timeout is 6
> > seconds
> > > > and
> > > > > > > >> > > > max.session.timeout is 5 minutes, which seems
> reasonable
> > > to
> > > > me
> > > > > > (we
> > > > > > > >> can
> > > > > > > >> > of
> > > > > > > >> > > > course modify this broker config to enlarge the valid
> > > > interval
> > > > > > if
> > > > > > > we
> > > > > > > >> > want
> > > > > > > >> > > > in practice). And then we should also consider
> removing
> > > the
> > > > > > > >> condition
> > > > > > > >> > for
> > > > > > > >> > > > marking a client as failed if the rebalance timeout
> has
> > > > > reached
> > > > > > > >> while
> > > > > > > >> > the
> > > > > > > >> > > > JoinGroup was not received, so that the semantics of
> > > > > > > session_timeout
> > > > > > > >> > and
> > > > > > > >> > > > rebalance_timeout are totally separated: the former is
> > > only
> > > > > used
> > > > > > > to
> > > > > > > >> > > > determine if a consumer member of the group should be
> > > marked
> > > > > as
> > > > > > > >> failed
> > > > > > > >> > > and
> > > > > > > >> > > > kicked out of the group, and the latter is only used
> to
> > > > > > determine
> > > > > > > >> the
> > > > > > > >> > > > longest time coordinator should wait for
> > PREPARE_REBALANCE
> > > > > > phase.
> > > > > > > In
> > > > > > > >> > > other
> > > > > > > >> > > > words if a member did not send the JoinGroup in time
> of
> > > the
> > > > > > > >> > > > rebalance_timeout, we still include it in the new
> > > generation
> > > > > of
> > > > > > > the
> > > > > > > >> > group
> > > > > > > >> > > > and use its old subscription info to send to leader
> for
> > > > > > > assignment.
> > > > > > > >> > Later
> > > > > > > >> > > > if the member came back with HeartBeat request, we can
> > > still
> > > > > > > follow
> > > > > > > >> the
> > > > > > > >> > > > normal path to bring it to the latest generation while
> > > > > checking
> > > > > > > that
> > > > > > > >> > its
> > > > > > > >> > > > sent JoinGroup request contains the same subscription
> > info
> > > > as
> > > > > we
> > > > > > > >> used
> > > > > > > >> > to
> > > > > > > >> > > > assign the partitions previously (which should be
> likely
> > > the
> > > > > > case
> > > > > > > in
> > > > > > > >> > > > practice). In addition, we should let static members
> to
> > > not
> > > > > send
> > > > > > > the
> > > > > > > >> > > > LeaveGroup request when it is gracefully shutdown, so
> > > that a
> > > > > > > static
> > > > > > > >> > > member
> > > > > > > >> > > > can only be leaving the group if its session has timed
> > > out,
> > > > OR
> > > > > > it
> > > > > > > >> has
> > > > > > > >> > > been
> > > > > > > >> > > > indicated to not exist in the group any more (details
> > > > below).
> > > > > > > >> > > >
> > > > > > > >> > > > 1.b) We have a parallel discussion about Incremental
> > > > > Cooperative
> > > > > > > >> > > > Rebalancing, in which we will encode the "when to
> > > rebalance"
> > > > > > logic
> > > > > > > >> at
> > > > > > > >> > the
> > > > > > > >> > > > application level, instead of at the protocol level.
> By
> > > > doing
> > > > > > this
> > > > > > > >> we
> > > > > > > >> > can
> > > > > > > >> > > > also enable a few other optimizations, e.g. at the
> > Streams
> > > > > level
> > > > > > > to
> > > > > > > >> > first
> > > > > > > >> > > > build up the state store as standby tasks and then
> > > trigger a
> > > > > > > second
> > > > > > > >> > > > rebalance to actually migrate the active tasks while
> > > keeping
> > > > > the
> > > > > > > >> actual
> > > > > > > >> > > > rebalance latency and hence unavailability window to
> be
> > > > small
> > > > > (
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=5T%2FocdSTFR0kDWba3yGPs0tZNRJPCSnKBWrkiywFlwI%3D&amp;reserved=0
> > > > > > > >> > > ).
> > > > > > > >> > > > I'd propose we align
> > > > > > > >> > > > KIP-345 along with this idea, and hence do not add the
> > > > > > > >> > expansion_timeout
> > > > > > > >> > > as
> > > > > > > >> > > > part of the protocol layer, but only do that at the
> > > > > > application's
> > > > > > > >> > > > coordinator / assignor layer (Connect, Streams, etc).
> We
> > > can
> > > > > > > still,
> > > > > > > >> > > > deprecate the "*group.initial.rebalance.delay.ms
> > > > > > > >> > > > <
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&amp;data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&amp;sdata=HBnPB8ua6UcXzf58FZIdUn%2Flsy%2BhTgjF80OjtNjFIl0%3D&amp;reserved=0
> > > > > > > >> > > >*"
> > > > > > > >> > > > though as part of this KIP
> > > > > > > >> > > > since we have discussed about its limit and think it
> is
> > > > > actually
> > > > > > > >> not a
> > > > > > > >> > > very
> > > > > > > >> > > > good design and could be replaced with client-side
> logic
> > > > > above.
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > 2. I'd like to see your thoughts on the upgrade path
> for
> > > > this
> > > > > > KIP.
> > > > > > > >> More
> > > > > > > >> > > > specifically, let's say after we have upgraded broker
> > > > version
> > > > > to
> > > > > > > be
> > > > > > > >> > able
> > > > > > > >> > > to
> > > > > > > >> > > > recognize the new versions of JoinGroup request and
> the
> > > > admin
> > > > > > > >> requests,
> > > > > > > >> > > how
> > > > > > > >> > > > should we upgrade the clients and enable static
> groups?
> > On
> > > > top
> > > > > > of
> > > > > > > my
> > > > > > > >> > head
> > > > > > > >> > > > if we do a rolling bounce in which we set the
> > member.name
> > > > > > config
> > > > > > > as
> > > > > > > >> > well
> > > > > > > >> > > > as
> > > > > > > >> > > > optionally increase the session.timeout config when we
> > > > bounce
> > > > > > each
> > > > > > > >> > > > instance, then during this rolling bounces we will
> have
> > a
> > > > > group
> > > > > > > >> > contained
> > > > > > > >> > > > with both dynamic members and static members. It means
> > > that
> > > > we
> > > > > > > >> should
> > > > > > > >> > > have
> > > > > > > >> > > > the group to allow such scenario (i.e. we cannot
> reject
> > > > > > JoinGroup
> > > > > > > >> > > requests
> > > > > > > >> > > > from dynamic members), and hence the "member.name"
> -> "
> > > > > > member.id"
> > > > > > > >> > > mapping
> > > > > > > >> > > > will only be partial at this scenario. Also could you
> > > > describe
> > > > > > if
> > > > > > > >> the
> > > > > > > >> > > > upgrade to the first version that support this feature
> > > would
> > > > > > ever
> > > > > > > >> get
> > > > > > > >> > any
> > > > > > > >> > > > benefits, or only the future upgrade path for rolling
> > > > bounces
> > > > > > > could
> > > > > > > >> get
> > > > > > > >> > > > benefits out of this feature?
> > > > > > > >> > > >
> > > > > > > >> > > > If that's the case and we will do 1) as suggested
> above,
> > > do
> > > > we
> > > > > > > still
> > > > > > > >> > need
> > > > > > > >> > > > the enableStaticMembership and enableDynamicMembership
> > > admin
> > > > > > > >> requests
> > > > > > > >> > any
> > > > > > > >> > > > more? Seems it is not necessary any more as we will
> only
> > > > have
> > > > > > the
> > > > > > > >> > notion
> > > > > > > >> > > of
> > > > > > > >> > > > "dynamic or static members" that can co-exist in a
> group
> > > > while
> > > > > > > >> there no
> > > > > > > >> > > > notion of "dynamic or static groups", and hence these
> > two
> > > > > > requests
> > > > > > > >> are
> > > > > > > >> > > not
> > > > > > > >> > > > needed anymore.
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > 3. We need to briefly talk about the implications for
> > ACL
> > > as
> > > > > we
> > > > > > > >> > introduce
> > > > > > > >> > > > new admin requests that are related to a specific
> > > group.id.
> > > > > For
> > > > > > > >> > example,
> > > > > > > >> > > > we
> > > > > > > >> > > > need to make sure that whoever created the group or
> > joined
> > > > the
> > > > > > > group
> > > > > > > >> > can
> > > > > > > >> > > > actually send admin requests for the group, otherwise
> > the
> > > > > > > >> application
> > > > > > > >> > > > owners need to bother the Kafka operators on a
> > > multi-tenant
> > > > > > > cluster
> > > > > > > >> > every
> > > > > > > >> > > > time they want to send any admin requests for their
> > groups
> > > > > which
> > > > > > > >> would
> > > > > > > >> > be
> > > > > > > >> > > > an operational nightmare.
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > 4. I like Jason's suggestion of adding an optional
> field
> > > for
> > > > > the
> > > > > > > >> list
> > > > > > > >> > of
> > > > > > > >> > > > member names, and I'm wondering if that can be done as
> > > part
> > > > of
> > > > > > the
> > > > > > > >> > > > forceStaticRebalance request: i.e. by passing a list
> of
> > > > > members,
> > > > > > > we
> > > > > > > >> > will
> > > > > > > >> > > > enforce a rebalance immediately since it indicates
> that
> > > some
> > > > > > > static
> > > > > > > >> > > member
> > > > > > > >> > > > will be officially kicked out of the group and some
> new
> > > > static
> > > > > > > >> members
> > > > > > > >> > > may
> > > > > > > >> > > > be added. So back to 1.a) above, a static member can
> > only
> > > be
> > > > > > > kicked
> > > > > > > >> out
> > > > > > > >> > > of
> > > > > > > >> > > > the group if a) its session (arguably long period of
> > time)
> > > > has
> > > > > > > timed
> > > > > > > >> > out,
> > > > > > > >> > > > and b) this admin request explicitly state that it is
> no
> > > > > longer
> > > > > > > >> part of
> > > > > > > >> > > the
> > > > > > > >> > > > group. As for execution I'm fine with keeping it as a
> > > future
> > > > > > work
> > > > > > > of
> > > > > > > >> > this
> > > > > > > >> > > > KIP if you'd like to make its scope smaller.
> > > > > > > >> > > >
> > > > > > > >> > > > Following are minor comments:
> > > > > > > >> > > >
> > > > > > > >> > > > 5. I'm not sure if we need to include "member.name"
> as
> > > part
> > > > > of
> > > > > > > the
> > > > > > > >> > > > OffsetCommitRequest for fencing purposes, as I think
> the
> > > > > > memberId
> > > > > > > >> plus
> > > > > > > >> > > the
> > > > > > > >> > > > generation number should be sufficient for fencing
> even
> > > with
> > > > > > > static
> > > > > > > >> > > > members.
> > > > > > > >> > > >
> > > > > > > >> > > > 6. As mentioned above, if we agree to do 1) we can get
> > rid
> > > > of
> > > > > > the
> > > > > > > "
> > > > > > > >> > > > LEAVE_GROUP_ON_CLOSE_CONFIG" config.
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > Guozhang
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > > >> wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Hey Boyang,
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks for the proposal! This is very useful. I have
> > > some
> > > > > > > comments
> > > > > > > >> > > below:
> > > > > > > >> > > > >
> > > > > > > >> > > > > 1) The motivation currently explicitly states that
> the
> > > > goal
> > > > > is
> > > > > > > to
> > > > > > > >> > > improve
> > > > > > > >> > > > > performance for heavy state application. It seems
> that
> > > the
> > > > > > > >> motivation
> > > > > > > >> > > can
> > > > > > > >> > > > > be stronger with the following use-case. Currently
> for
> > > > > > > MirrorMaker
> > > > > > > >> > > > cluster
> > > > > > > >> > > > > with e.g. 100 MirrorMaker processes, it will take a
> > long
> > > > > time
> > > > > > to
> > > > > > > >> > > rolling
> > > > > > > >> > > > > bounce the entire MirrorMaker cluster. Each
> > MirrorMaker
> > > > > > process
> > > > > > > >> > restart
> > > > > > > >> > > > > will trigger a rebalance which currently pause the
> > > > > consumption
> > > > > > > of
> > > > > > > >> the
> > > > > > > >> > > all
> > > > > > > >> > > > > partitions of the MirrorMaker cluster. With the
> change
> > > > > stated
> > > > > > in
> > > > > > > >> this
> > > > > > > >> > > > > patch, as long as a MirrorMaker can restart within
> the
> > > > > > specified
> > > > > > > >> > > timeout
> > > > > > > >> > > > > (e.g. 2 minutes), then we only need constant number
> of
> > > > > > rebalance
> > > > > > > >> > (e.g.
> > > > > > > >> > > > for
> > > > > > > >> > > > > leader restart) for the entire rolling bounce, which
> > > will
> > > > > > > >> > significantly
> > > > > > > >> > > > > improves the availability of the MirrorMaker
> pipeline.
> > > In
> > > > my
> > > > > > > >> opinion,
> > > > > > > >> > > the
> > > > > > > >> > > > > main benefit of the KIP is to avoid unnecessary
> > > rebalance
> > > > if
> > > > > > the
> > > > > > > >> > > consumer
> > > > > > > >> > > > > process can be restarted within soon, which helps
> > > > > performance
> > > > > > > >> even if
> > > > > > > >> > > > > overhead of state shuffling for a given process is
> > > small.
> > > > > > > >> > > > >
> > > > > > > >> > > > > 2) In order to simplify the KIP reading, can you
> > follow
> > > > the
> > > > > > > >> writeup
> > > > > > > >> > > style
> > > > > > > >> > > > > of other KIP (e.g. KIP-98) and list the interface
> > change
> > > > > such
> > > > > > as
> > > > > > > >> new
> > > > > > > >> > > > > configs (e.g. registration timeout), new
> > > request/response,
> > > > > new
> > > > > > > >> > > > AdminClient
> > > > > > > >> > > > > API and new error code (e.g.
> DUPLICATE_STATIC_MEMBER)?
> > > > > > Currently
> > > > > > > >> some
> > > > > > > >> > > of
> > > > > > > >> > > > > these are specified in the Proposed Change section
> > which
> > > > > makes
> > > > > > > it
> > > > > > > >> a
> > > > > > > >> > bit
> > > > > > > >> > > > > inconvenient to understand the new interface that
> will
> > > be
> > > > > > > exposed
> > > > > > > >> to
> > > > > > > >> > > > user.
> > > > > > > >> > > > > Explanation of the current two-phase rebalance
> > protocol
> > > > > > probably
> > > > > > > >> can
> > > > > > > >> > be
> > > > > > > >> > > > > moved out of public interface section.
> > > > > > > >> > > > >
> > > > > > > >> > > > > 3) There are currently two version of
> JoinGroupRequest
> > > in
> > > > > the
> > > > > > > KIP
> > > > > > > >> and
> > > > > > > >> > > > only
> > > > > > > >> > > > > one of them has field memberId. This seems
> confusing.
> > > > > > > >> > > > >
> > > > > > > >> > > > > 4) It is mentioned in the KIP that "An admin API to
> > > force
> > > > > > > >> rebalance
> > > > > > > >> > > could
> > > > > > > >> > > > > be helpful here, but we will make a call once we
> > > finished
> > > > > the
> > > > > > > >> major
> > > > > > > >> > > > > implementation". So this seems to be still an open
> > > > question
> > > > > in
> > > > > > > the
> > > > > > > >> > > > current
> > > > > > > >> > > > > design. We probably want to agree on this before
> > voting
> > > > for
> > > > > > the
> > > > > > > >> KIP.
> > > > > > > >> > > > >
> > > > > > > >> > > > > 5) The KIP currently adds new config MEMBER_NAME for
> > > > > consumer.
> > > > > > > Can
> > > > > > > >> > you
> > > > > > > >> > > > > specify the name of the config key and the default
> > > config
> > > > > > value?
> > > > > > > >> > > Possible
> > > > > > > >> > > > > default values include empty string or null (similar
> > to
> > > > > > > >> > transaction.id
> > > > > > > >> > > > in
> > > > > > > >> > > > > producer config).
> > > > > > > >> > > > >
> > > > > > > >> > > > > 6) Regarding the use of the topic
> "static_member_map"
> > to
> > > > > > persist
> > > > > > > >> > member
> > > > > > > >> > > > > name map, currently if consumer coordinator broker
> > goes
> > > > > > offline,
> > > > > > > >> > > > rebalance
> > > > > > > >> > > > > is triggered and consumers will try connect to the
> new
> > > > > > > >> coordinator.
> > > > > > > >> > If
> > > > > > > >> > > > > these consumers can connect to the new coordinator
> > > within
> > > > > > > >> > > > > max.poll.interval.ms which by default is 5 minutes,
> > > given
> > > > > > that
> > > > > > > >> > broker
> > > > > > > >> > > > can
> > > > > > > >> > > > > use a deterministic algorithm to determine the
> > partition
> > > > ->
> > > > > > > >> > member_name
> > > > > > > >> > > > > mapping, each consumer should get assigned the same
> > set
> > > of
> > > > > > > >> partitions
> > > > > > > >> > > > > without requiring state shuffling. So it is not
> clear
> > > > > whether
> > > > > > we
> > > > > > > >> > have a
> > > > > > > >> > > > > strong use-case for this new logic. Can you help
> > clarify
> > > > > what
> > > > > > is
> > > > > > > >> the
> > > > > > > >> > > > > benefit of using topic "static_member_map" to
> persist
> > > > member
> > > > > > > name
> > > > > > > >> > map?
> > > > > > > >> > > > >
> > > > > > > >> > > > > 7) Regarding the introduction of the
> > expensionTimeoutMs
> > > > > > config,
> > > > > > > >> it is
> > > > > > > >> > > > > mentioned that "we are using expansion timeout to
> > > replace
> > > > > > > >> rebalance
> > > > > > > >> > > > > timeout, which is configured by max.poll.intervals
> > from
> > > > > client
> > > > > > > >> side,
> > > > > > > >> > > and
> > > > > > > >> > > > > using registration timeout to replace session
> > timeout".
> > > > > > > Currently
> > > > > > > >> the
> > > > > > > >> > > > > default max.poll.interval.ms is configured to be 5
> > > > minutes
> > > > > > and
> > > > > > > >> there
> > > > > > > >> > > > will
> > > > > > > >> > > > > be only one rebalance if all new consumers can join
> > > > within 5
> > > > > > > >> minutes.
> > > > > > > >> > > So
> > > > > > > >> > > > it
> > > > > > > >> > > > > is not clear whether we have a strong use-case for
> > this
> > > > new
> > > > > > > >> config.
> > > > > > > >> > Can
> > > > > > > >> > > > you
> > > > > > > >> > > > > explain what is the benefit of introducing this new
> > > > config?
> > > > > > > >> > > > >
> > > > > > > >> > > > > 8) It is mentioned that "To distinguish between
> > previous
> > > > > > version
> > > > > > > >> of
> > > > > > > >> > > > > protocol, we will also increase the join group
> request
> > > > > version
> > > > > > > to
> > > > > > > >> v4
> > > > > > > >> > > when
> > > > > > > >> > > > > MEMBER_NAME is set" and "If the broker version is
> not
> > > the
> > > > > > latest
> > > > > > > >> (<
> > > > > > > >> > > v4),
> > > > > > > >> > > > > the join group request shall be downgraded to v3
> > without
> > > > > > setting
> > > > > > > >> the
> > > > > > > >> > > > member
> > > > > > > >> > > > > Id". It is probably simpler to just say that this
> > > feature
> > > > is
> > > > > > > >> enabled
> > > > > > > >> > if
> > > > > > > >> > > > > JoinGroupRequest V4 is supported on both client and
> > > broker
> > > > > and
> > > > > > > >> > > > MEMBER_NAME
> > > > > > > >> > > > > is configured with non-empty string.
> > > > > > > >> > > > >
> > > > > > > >> > > > > 9) It is mentioned that broker may return
> > > > > > > >> NO_STATIC_MEMBER_INFO_SET
> > > > > > > >> > > error
> > > > > > > >> > > > > in OffsetCommitResponse for "commit requests under
> > > static
> > > > > > > >> > membership".
> > > > > > > >> > > > Can
> > > > > > > >> > > > > you clarify how broker determines whether the commit
> > > > request
> > > > > > is
> > > > > > > >> under
> > > > > > > >> > > > > static membership?
> > > > > > > >> > > > >
> > > > > > > >> > > > > Thanks,
> > > > > > > >> > > > > Dong
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > > > --
> > > > > > > >> > > > -- Guozhang
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > --
> > > > > > > >> > -Regards,
> > > > > > > >> > Mayuresh R. Gharat
> > > > > > > >> > (862) 250-7125
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -Regards,
> > > > > > > > Mayuresh R. Gharat
> > > > > > > > (862) 250-7125
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -Regards,
> > > > > > > Mayuresh R. Gharat
> > > > > > > (862) 250-7125
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -Regards,
> > > > > Mayuresh R. Gharat
> > > > > (862) 250-7125
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
> --
> -- Guozhang
>


-- 
-- Guozhang

Reply via email to