Yes, I think it makes sense to let KafkaStreams to expose embedded consumer client-id and instance-ids, e.g. at the ThreadMetadata exposed via `localThreadsMetadata`.
I can open up a new KIP for the change on Streams side so that we can keep this KIP as on consumer side only. Guozhang On Fri, Nov 30, 2018 at 9:31 PM Boyang Chen <bche...@outlook.com> wrote: > Thanks Guozhang and Mayuresh for the followup here. > > Also I was thinking if we can have a replace API, that takes in a map of > > old to new instance Ids. Such that we can replace a consumer. > > IF we have this api, and if a consumer host goes down due to hardware > > issues, we can have another host spin up and take its place. This is > like a > > cold backup which can be a step towards providing the hot backup that we > > discussed earlier in the KIP. > I like Mayuresh's suggestion, and I think we could prepare follow-up work > once 345 is done to add a replace API. For the > very first version I feel this is not a must-have. > > For Streams, I think we do not need an extra config for the instance id, > instead, we can re-use the way we construct the embedded consumer's client > id as: > > [streams client-id] + "-StreamThread-" + [thread-id] + "-consumer" > > So as long as user's specify the unique streams client-id, the resulted > consumer client-id / instance-id should be unique as well already. > > So Guozhang you mean stream will enable static membership automatically > correct? That would make the logic simpler > and fewer code change on stream side. > > As for the LeaveGroupRequest, as I understand it, your concern is that when > we are shutting down a single Streams instance that may contain multiple > threads, shutting down that instance would mean shutting down multiple > members. Personally I'd prefer to make the LeaveGroupRequest API more > general and less inclined to Streams (I think Mayuresh also suggested > this). So I'd suggest that we keep the LeaveGroupRequest API as suggested, > i.e. a list of member.instance.ids. And in Streams we can add a new API in > KafkaStreams to expose: > > 1) the list of embedded consumer / producer client ids, > 2) the producer's txn ids if EOS is turned on, and > 3) the consumer's instance ids. > > I agree with the suggestion to make the leave group request change > generic. So this new Stream API > will be added on the rest layer to expose the necessary ids correct? > > Looking forward to your confirmation 😊 > > Best, > Boyang > > ------------------------------ > *From:* Guozhang Wang <wangg...@gmail.com> > *Sent:* Saturday, December 1, 2018 7:00 AM > *To:* dev > *Subject:* Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > specifying member id > > Hi Boyang, > > For Streams, I think we do not need an extra config for the instance id, > instead, we can re-use the way we construct the embedded consumer's client > id as: > > [streams client-id] + "-StreamThread-" + [thread-id] + "-consumer" > > So as long as user's specify the unique streams client-id, the resulted > consumer client-id / instance-id should be unique as well already. > > As for the LeaveGroupRequest, as I understand it, your concern is that when > we are shutting down a single Streams instance that may contain multiple > threads, shutting down that instance would mean shutting down multiple > members. Personally I'd prefer to make the LeaveGroupRequest API more > general and less inclined to Streams (I think Mayuresh also suggested > this). So I'd suggest that we keep the LeaveGroupRequest API as suggested, > i.e. a list of member.instance.ids. And in Streams we can add a new API in > KafkaStreams to expose: > > 1) the list of embedded consumer / producer client ids, > 2) the producer's txn ids if EOS is turned on, and > 3) the consumer's instance ids. > > So that Streams operators can read those values from KafkaStreams directly > before shutting it down and use the list in the LeaveGroupRequest API. How > about that? > > > Guozhang > > > On Fri, Nov 30, 2018 at 7:45 AM Mayuresh Gharat < > gharatmayures...@gmail.com> > wrote: > > > I like Guozhang's suggestion to not have to wait for session timeout in > > case we know that we want to downsize the consumer group and redistribute > > the partitions among the remaining consumers. > > IIUC, with the above suggestions, the admin api > > "removeMemberFromGroup(groupId, list[instanceId])" or > > "removeMemberFromGroup(groupId, instanceId)", will automatically cause a > > rebalance, right? > > I would prefer ist[instanceid] because that's more general scenario. > > > > Also I was thinking if we can have a replace API, that takes in a map of > > old to new instance Ids. Such that we can replace a consumer. > > IF we have this api, and if a consumer host goes down due to hardware > > issues, we can have another host spin up and take its place. This is > like a > > cold backup which can be a step towards providing the hot backup that we > > discussed earlier in the KIP. > > Thoughts? > > > > Thanks, > > > > Mayuresh > > > > On Thu, Nov 29, 2018 at 1:30 AM Boyang Chen <bche...@outlook.com> wrote: > > > > > In fact I feel that it's more convenient for user to specify a list of > > > instance id prefixes. Because > > > for general consumer application we couldn't always find a proper > prefix > > > to remove a list of consumers. > > > So we are either adding list[instanceid prefix], or we could add two > > > fields: instanceid prefix, and list[instanceid] > > > for clarity purpose. As you know, two options are equivalent since full > > > name is subset of prefix. > > > > > > Let me know your thoughts! > > > > > > Boyang > > > ________________________________ > > > From: Boyang Chen <bche...@outlook.com> > > > Sent: Thursday, November 29, 2018 3:39 PM > > > To: dev@kafka.apache.org > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > > > specifying member id > > > > > > Thanks Guozhang for the new proposal here! > > > > > > So I'd like to propose a slightly modified version of > LeaveGroupRequest: > > > instead of letting the static member consumer client themselves to send > > the > > > request (which means we still need to have some hidden configs to turn > it > > > off like we did today), how about just letting any other client to send > > > this request since the LeaveGroupRequest only requires group.id and > > > member.id? So back to your operational scenarios, if some static > member > > > has > > > been found crashed and it is not likely to comeback, or we simply want > to > > > shrink the size of the group by shutting down some static members, we > can > > > use an admin client to send the LeaveGroupRequest after the instance > has > > > been completely shutdown or crashed to kick them out of the group and > > also > > > triggers the rebalance. > > > > > > One issue though, is that users may not know the member id required in > > the > > > LeaveGroupRequest. To work around it we can add the `group.instance.id > ` > > > along with the member id as well and then allow member id null-able. > The > > > coordinator logic would then be modified as 1) if member.id is > > specified, > > > ignore instance.id and always use member.id to find the member to kick > > > out, > > > 2) otherwise, try with the instance.id to find the corresponding > > member.id > > > and kick it out, 3) if none is found, reject with an error code. > > > > > > So in sum the alternative changes are: > > > > > > a) Modify LeaveGroupRequest to add group.instance.id > > > b) Modify coordinator logic to handle such request on the broker side. > > > c) Add a new API in AdminClient like "removeMemberFromGroup(groupId, > > > instanceId)" which will be translated as a LeaveGroupRequest. > > > d) [Optional] we can even batch the request by allowing > > > "removeMemberFromGroup(groupId, list[instanceId])" and then make ` > > > member.id` > > > and `instance.id` field of LeaveGroupRequest to be an array instead > of a > > > single entry. > > > e) We can also remove the admin ConsumerRebalanceRequest as well for > > > simplicity (why not? paranoid of having as less request protocols as > > > possible :), as it is not needed anymore with the above proposal. > > > I agree that reusing LeaveGroupRequest is actually a good idea: we only > > > need to iterate > > > over an existing request format. Also I found that we haven't discussed > > > how we want to enable > > > this feature on Streaming applications, which is different from common > > > consumer application in that > > > Stream app uses stream thread as individual consumer. > > > For example if user specifies the client id, the stream consumer client > > id > > > will be like: > > > User client id + "-StreamThread-" + thread id + "-consumer" > > > > > > So I'm thinking we should do sth similar for defining > group.instance.id > > > on Stream. We shall define another > > > config called `stream.instance.id` which would be used as prefix, and > > for > > > each thread consumer the formula > > > will look like: > > > `group.instance.id` = `stream.instance.id` + "-" + thread id + > > "-consumer" > > > > > > And for the ease of use, the interface of leave group request could > > > include `group.instance.id.prefix` instead of > > > `group.instance.id` so that we could batch remove consumers relating > to > > a > > > single stream instance. This is more intuitive > > > and flexible since specifying names of 16~32 * n (n = number of stream > > > instances to shut down) consumers is not an easy > > > job without client management tooling. > > > > > > How does this workaround sound? > > > > > > Boyang > > > ________________________________ > > > From: Guozhang Wang <wangg...@gmail.com> > > > Sent: Thursday, November 29, 2018 2:38 AM > > > To: dev > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > > > specifying member id > > > > > > Hi Boyang, > > > > > > I was thinking that with the optional static members in the admin > > > ConsumerRebalanceRequest it should be sufficient to kick out the static > > > member before their session timeout (arguably long in practice) have > not > > > reached. But now I see your concern is that in some situations the > admin > > > operators may not even know the full list of static members, but ONLY > > know > > > which static member has failed and hence would like to kick out of the > > > group. > > > > > > So I'd like to propose a slightly modified version of > LeaveGroupRequest: > > > instead of letting the static member consumer client themselves to send > > the > > > request (which means we still need to have some hidden configs to turn > it > > > off like we did today), how about just letting any other client to send > > > this request since the LeaveGroupRequest only requires group.id and > > > member.id? So back to your operational scenarios, if some static > member > > > has > > > been found crashed and it is not likely to comeback, or we simply want > to > > > shrink the size of the group by shutting down some static members, we > can > > > use an admin client to send the LeaveGroupRequest after the instance > has > > > been completely shutdown or crashed to kick them out of the group and > > also > > > triggers the rebalance. > > > > > > One issue though, is that users may not know the member id required in > > the > > > LeaveGroupRequest. To work around it we can add the `group.instance.id > ` > > > along with the member id as well and then allow member id null-able. > The > > > coordinator logic would then be modified as 1) if member.id is > > specified, > > > ignore instance.id and always use member.id to find the member to kick > > > out, > > > 2) otherwise, try with the instance.id to find the corresponding > > member.id > > > and kick it out, 3) if none is found, reject with an error code. > > > > > > So in sum the alternative changes are: > > > > > > a) Modify LeaveGroupRequest to add group.instance.id > > > b) Modify coordinator logic to handle such request on the broker side. > > > c) Add a new API in AdminClient like "removeMemberFromGroup(groupId, > > > instanceId)" which will be translated as a LeaveGroupRequest. > > > d) [Optional] we can even batch the request by allowing > > > "removeMemberFromGroup(groupId, list[instanceId])" and then make ` > > > member.id` > > > and `instance.id` field of LeaveGroupRequest to be an array instead > of a > > > single entry. > > > e) We can also remove the admin ConsumerRebalanceRequest as well for > > > simplicity (why not? paranoid of having as less request protocols as > > > possible :), as it is not needed anymore with the above proposal. > > > > > > > > > WDYT? > > > > > > > > > Guozhang > > > > > > On Wed, Nov 28, 2018 at 5:34 AM Boyang Chen <bche...@outlook.com> > wrote: > > > > > > > Thanks Guozhang and Mayuresh for the follow up! Answers are listed > > below. > > > > > > > > > > > > > 5. Regarding "So in summary, *the member will only be removed due > to > > > > > session timeout*. We shall remove it from both in-memory static > > member > > > > name > > > > > mapping and member list." If the rebalance is invoked manually > using > > > the > > > > > the admin apis, how long should the group coordinator wait for the > > > > members > > > > > of the group to send a JoinGroupRequest for participating in the > > > > rebalance? > > > > > How is a lagging consumer handled? > > > > > > > > Great question. Let's use c1~c4 example here: > > > > > > > > 1. Consumer c1, c2, c3, c4 in stable state > > > > 2. c4 goes down and we detect this issue before session timeout > > > through > > > > client monitoring. Initiate a ConsumerRebalanceRequest. > > > > 3. A rebalance will be kicking off, and after rebalance timeout we > > > > shall keep the same assignment for c1~4, if the session timeout for > c4 > > > > hasn't reached > > > > 4. Group back to stable with c1~4 (although c4 is actually > offline) > > > > 5. c4 session timeout finally reached: another rebalance > triggered. > > > > > > > > For step 3, if session timeout triggered within rebalance timeout, > only > > > > c1~3 will be participating in the rebalance. This is what we mean by > > > saying > > > > "rebalance > > > > timeout shall not remove current members, only session timeout will > > do." > > > > As you could see this is not an ideal scenario: we trigger extra > > > rebalance > > > > at step 5. In my reply to Guozhang I'm asking whether we should still > > use > > > > LeaveGroupRequest for static members to send a signal to broker > saying > > > "I'm > > > > currently offline", and when we send ConsumerRebalanceRequest to > > broker, > > > we > > > > will actually kick off c4 because it says it's offline already, > saving > > > one > > > > or multiple additional rebalances later. This way the > > > > ConsumerRebalanceRequest will be more effective in making correct > > > judgement > > > > on the group status since we have more feedback from client side. > > > > > > > > > - When we say that we would use invokeConsumerRebalance(groupId) to > > > down > > > > > scale, with the example in the above question, how will the > > > > > GroupCoordinator know that c4 should be kicked out of the group > since > > > we > > > > > are trying to invoke rebalance proactively without waiting for c4's > > > > session > > > > > time out to expire. Should there be a way of telling the > > > GroupCoordinator > > > > > that consumer c4 has been kicked out of the groupId = "GroupA"? > > > > Previous proposal should be suffice to answer this question 😊 > > > > > > > > - Also it looks like the statement "If the `member.id` uses > > > > > UNKNOWN_MEMBER_NAME, we shall always generate a new member id and > > > replace > > > > > the one within current map, if `group.member.name` is known. Also > > once > > > > we > > > > > are done with KIP-394 > > > > > < > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=AWHFruanfD%2BR2S4thCniTxKUBTo9fziEzEDeefxskrs%3D&reserved=0 > > > > > >, > > > > > all the join group requests are requiring `member.id` to > physically > > > > enter > > > > > the consumer group. This way the latest joined " is incomplete. Can > > you > > > > > take a look at this? > > > > > Also when we say "all the join group requests are requiring ` > > member.id > > > ` > > > > to > > > > > physically enter the consumer group." because a newly started > > consumer > > > > will > > > > > not have a "member.id", I assume you mean, once the > GroupCoordinator > > > > > assigns a member.id to the newly started consumer, it has to use > it > > > for > > > > > any > > > > > future JoinGroupRequests. Is my understanding correct? > > > > > > > > > Thanks for catching it! And yes, we shall use one extra round-trip > > > between > > > > consumer > > > > and broker to inform the new member id allocation. > > > > > > > > Next is the replies to Guozhang's comment: > > > > 2) I once have a discussion about the LeaveGroupRequest for static > > > members, > > > > and the reason for not having it for static members is that we'd need > > to > > > > make it a configurable behavior as well (i.e. the likelihood that a > > > static > > > > member may shutdown but come back later may be even larger than the > > > > likelihood that a shutdown static member would not come back), and > > when a > > > > shutdown is complete the instance cannot tell whether or not it will > > come > > > > back by itself. And hence letting a third party (think: admin used by > > K8s > > > > plugins) issuing a request to indicate static member changes would be > > > more > > > > plausible. > > > > > > > > I think having an optional list of all the static members that are > > still > > > in > > > > the group, rather than the members to be removed since the latter > > looks a > > > > bit less flexible to me, in the request is a good idea (remember we > > > allow a > > > > group to have both static and dynamic members at the same time, so > when > > > > receiving the request, we will only do the diff and add / remove the > > > static > > > > members directly only, while still let the dynamic members to try to > > > > re-join the group with the rebalance timeout). > > > > I'm also in favor of storing all the in-group static members. In fact > > we > > > > could reuse > > > > the static membership mapping to store this information. Do you think > > > > that we should let static member send leave group request to indicate > > > > their status of "leaving", > > > > and use ConsumerRebalanceRequest to trigger rebalance without them? > I'm > > > > suggesting we should > > > > remove those members when kicking off rebalance since we are shutting > > > them > > > > down already. > > > > > > > > 3) personally I favor "ids" over "names" :) Since we already have > some > > > > "ids" and hence it sounds more consistent, plus on the producer side > we > > > > have a `transactional.id` whose semantics is a bit similar to this > > one, > > > > i.e. for unique distinguishment of a client which may comes and goes > > but > > > > need to be persist over multiple "instance life-times". > > > > Sure we have enough votes for ids 😊I will finalize the name to ` > > > > group.instance.id`, does that > > > > sound good? > > > > > > > > Best, > > > > Boyang > > > > ________________________________ > > > > From: Guozhang Wang <wangg...@gmail.com> > > > > Sent: Wednesday, November 28, 2018 4:51 AM > > > > To: dev > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances > by > > > > specifying member id > > > > > > > > Regarding Jason's question and Boyang's responses: > > > > > > > > 2) I once have a discussion about the LeaveGroupRequest for static > > > members, > > > > and the reason for not having it for static members is that we'd need > > to > > > > make it a configurable behavior as well (i.e. the likelihood that a > > > static > > > > member may shutdown but come back later may be even larger than the > > > > likelihood that a shutdown static member would not come back), and > > when a > > > > shutdown is complete the instance cannot tell whether or not it will > > come > > > > back by itself. And hence letting a third party (think: admin used by > > K8s > > > > plugins) issuing a request to indicate static member changes would be > > > more > > > > plausible. > > > > > > > > I think having an optional list of all the static members that are > > still > > > in > > > > the group, rather than the members to be removed since the latter > > looks a > > > > bit less flexible to me, in the request is a good idea (remember we > > > allow a > > > > group to have both static and dynamic members at the same time, so > when > > > > receiving the request, we will only do the diff and add / remove the > > > static > > > > members directly only, while still let the dynamic members to try to > > > > re-join the group with the rebalance timeout). > > > > > > > > 3) personally I favor "ids" over "names" :) Since we already have > some > > > > "ids" and hence it sounds more consistent, plus on the producer side > we > > > > have a `transactional.id` whose semantics is a bit similar to this > > one, > > > > i.e. for unique distinguishment of a client which may comes and goes > > but > > > > need to be persist over multiple "instance life-times". > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Tue, Nov 27, 2018 at 10:00 AM Mayuresh Gharat < > > > > gharatmayures...@gmail.com> > > > > wrote: > > > > > > > > > Hi Boyang, > > > > > > > > > > Thanks for the replies. Please find the follow up queries below. > > > > > > > > > > 5. Regarding "So in summary, *the member will only be removed > due > > > to > > > > > session timeout*. We shall remove it from both in-memory static > > member > > > > name > > > > > mapping and member list." If the rebalance is invoked manually > using > > > the > > > > > the admin apis, how long should the group coordinator wait for the > > > > members > > > > > of the group to send a JoinGroupRequest for participating in the > > > > rebalance? > > > > > How is a lagging consumer handled? > > > > > The plan is to disable member kick out when rebalance.timeout is > > > reached, > > > > > so basically we are not "waiting" any > > > > > join group request from existing members; we shall just rebalance > > base > > > on > > > > > what we currently have within the group > > > > > metadata. Lagging consumer will trigger rebalance later if session > > > > timeout > > > > > > rebalance timeout. > > > > > > > > > > > > > > > > Just wanted to understand this better. Lets take an example, say we > > > have > > > > a > > > > > > consumer group "GroupA" with 4 consumers c1, c2, c3, c4. > > > > > > Everything is running fine and suddenly C4 host has issues and it > > > goes > > > > > > down. Now we notice that we can still operate with c1, c2, c3 and > > > don't > > > > > > want to wait for > > > > > > c4 to come back up. We use the admin api > > > > > > "invokeConsumerRebalance("GroupA")". > > > > > > Now the GroupCoordinator, will ask the members c1, c2, c3 to join > > the > > > > > > group again (in there heartBeatResponse) as first step of > > rebalance. > > > > > > Now lets say that c1, c2 immediately send a joinGroupRequest but > c3 > > > is > > > > > > delayed. At this stage, if we are not "waiting" on any join group > > > > > request, > > > > > > few things can happen : > > > > > > > > > > > > - c4's partitions are distributed only among c1,c2. c3 > maintains > > > its > > > > > > original assignment. c1, c2 will start processing the newly > > > assigned > > > > > > partitions. > > > > > > > > > > > > OR > > > > > > > > > > > > - c4's partitions are distributed among c1, c2, c3. c1 and c2 > > > start > > > > > > processing the newly assigned partitions. c3 gets to know > about > > > the > > > > > newly > > > > > > assigned partitions later when it sends the JoinGroupRequest > > > (which > > > > > was > > > > > > delayed). > > > > > > > > > > > > OR > > > > > > > > > > > > - Will the rebalance do a complete reassignment, where c1, c2, > > c3 > > > > have > > > > > > to give up there partitions and all the partitions belonging > to > > > c1, > > > > > c2, c3, > > > > > > c4 will be redistributed among c1, c2, c3 ? If this is the > case, > > > the > > > > > > GroupCoordinator needs to give some buffer time for c1, c2, c3 > > to > > > > > revoke > > > > > > there partitions and rejoin the group. > > > > > > > > > > > > This is as per my understanding of how the KIP would work without > > > > > changing > > > > > > the underlying group coordination workflow. Please correct me if > I > > > > > > misunderstood something here. > > > > > > > > > > > > > > > > > > > > > - When we say that we would use invokeConsumerRebalance(groupId) to > > > down > > > > > scale, with the example in the above question, how will the > > > > > GroupCoordinator know that c4 should be kicked out of the group > since > > > we > > > > > are trying to invoke rebalance proactively without waiting for c4's > > > > session > > > > > time out to expire. Should there be a way of telling the > > > GroupCoordinator > > > > > that consumer c4 has been kicked out of the groupId = "GroupA"? > > > > > > > > > > - Also it looks like the statement "If the `member.id` uses > > > > > UNKNOWN_MEMBER_NAME, we shall always generate a new member id and > > > replace > > > > > the one within current map, if `group.member.name` is known. Also > > once > > > > we > > > > > are done with KIP-394 > > > > > < > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember.id%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=AWHFruanfD%2BR2S4thCniTxKUBTo9fziEzEDeefxskrs%3D&reserved=0 > > > > > >, > > > > > all the join group requests are requiring `member.id` to > physically > > > > enter > > > > > the consumer group. This way the latest joined " is incomplete. Can > > you > > > > > take a look at this? > > > > > Also when we say "all the join group requests are requiring ` > > member.id > > > ` > > > > to > > > > > physically enter the consumer group." because a newly started > > consumer > > > > will > > > > > not have a "member.id", I assume you mean, once the > GroupCoordinator > > > > > assigns a member.id to the newly started consumer, it has to use > it > > > for > > > > > any > > > > > future JoinGroupRequests. Is my understanding correct? > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Mayuresh > > > > > > > > > > On Mon, Nov 26, 2018 at 9:20 PM Boyang Chen <bche...@outlook.com> > > > wrote: > > > > > > > > > > > Thanks Mayuresh and Jason for your follow-ups! Let me try to > answer > > > > both > > > > > > in this reply. > > > > > > > > > > > > > > > > > > > 1. Do you intend to have member.id is a static config like > > > > > > member.name > > > > > > > after KIP-345 and KIP-394? > > > > > > > > > > > > No, we shall only rely on broker to allocate member.id for the > > > > consumer > > > > > > instances. FYI, I already > > > > > > > > > > > > started the discussion thread for KIP-394 😊 > > > > > > > > > > > > > 2. Regarding "On client side, we add a new config called > > > > MEMBER_NAME > > > > > > in > > > > > > > ConsumerConfig. On consumer service init, if the MEMBER_NAME > > > > config > > > > > is > > > > > > > set, > > > > > > > we will put it in the initial join group request to identify > > > > itself > > > > > > as a > > > > > > > static member (static membership); otherwise, we will still > > send > > > > > > > UNKNOWN_MEMBER_ID to ask broker for allocating a new random > ID > > > > > > (dynamic > > > > > > > membership)." > > > > > > > - What is the value of member_id sent in the first > > > > > JoinGroupRequest > > > > > > > when member_name is set (using static rebalance)? Is it > > > > > > > UNKNOW_MEMBER_ID? > > > > > > > > > > > > Yes, we could only use unknown member id. Actually this part of > the > > > > > > proposal is outdated, > > > > > > > > > > > > let me do another audit of the whole doc. Basically, it is > > currently > > > > > > impossible to send `member.id` > > > > > > > > > > > > when consumer restarted. Sorry for the confusions! > > > > > > > > > > > > > 3. Regarding "we are requiring member.id (if not unknown) > to > > > > match > > > > > > the > > > > > > > value stored in cache, otherwise reply MEMBER_ID_MISMATCH. > The > > > > edge > > > > > > case > > > > > > > that if we could have members with the same `member.name` > > (for > > > > > > example > > > > > > > mis-configured instances with a valid member.id but added a > > > used > > > > > > member > > > > > > > name on runtime). When member name has duplicates, we could > > > refuse > > > > > > join > > > > > > > request from members with an outdated `member.id` (since we > > > > update > > > > > > the > > > > > > > mapping upon each join group request). In an edge case where > > the > > > > > > client > > > > > > > hits this exception in the response, it is suggesting that > > some > > > > > other > > > > > > > consumer takes its spot." > > > > > > > - The part of "some other consumer takes the spot" would > be > > > > > > > intentional, right? Also when you say " The edge case > that > > if > > > > we > > > > > > > could have members with the same `member.name` (for > > example > > > > > > > mis-configured instances *with a valid member.id < > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&reserved=0 > > > > > > > > > > > > > > *but > > > > > > > added a used member name on runtime).", what do you mean > by > > > > > *valid > > > > > > > member id* here? Does it mean that there exist a mapping > of > > > > > > > member.name to member.id like *MemberA -> id1* on the > > > > > > > GroupCoordinator and this consumer is trying to join > with * > > > > > > > member.name > > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1zOe9ZqIHB7lvt8XJt2jtLvRaP75G3OamvtLRLjysyo%3D&reserved=0 > > > > > > > > > > > > = MemberB and member.id < > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&reserved=0 > > > > > > > > > > > > = > > > > > > > id1 * > > > > > > > ? > > > > > > > > > > > > I would take Jason's advice that each time we have unknown member > > > > joining > > > > > > the group, the broker will > > > > > > > > > > > > always assign a new and unique id to track its identity. In this > > way, > > > > > > consumer with duplicate member name > > > > > > > > > > > > will be fenced. > > > > > > > > > > > > > 4. Depending on your explanation for point 2 and the point 3 > > > above > > > > > > > regarding returning back MEMBER_ID_MISMATCH on having a > > matching > > > > > > > member_name but unknown member_id, if the consumer sends > > > > > > > "UNKNOW_MEMBER_ID" > > > > > > > on the first JoinGroupRequest and relies on the > > GroupCoordinator > > > > to > > > > > > > give it > > > > > > > a member_id, is the consumer suppose to remember member_id > for > > > > > > > joinGroupRequests? If yes, how are restarts handled? > > > > > > > > > > > > Like explained above, we shall not materialize the member.id. > > > Instead > > > > we > > > > > > need to rely on broker to allocate > > > > > > > > > > > > a unique id for consumer just like what we have now. > > > > > > > > > > > > > 5. Regarding "So in summary, *the member will only be > removed > > > due > > > > to > > > > > > > session timeout*. We shall remove it from both in-memory > > static > > > > > member > > > > > > > name mapping and member list." > > > > > > > - If the rebalance is invoked manually using the the > admin > > > > apis, > > > > > > how > > > > > > > long should the group coordinator wait for the members of > > the > > > > > > > group to send > > > > > > > a JoinGroupRequest for participating in the rebalance? > How > > > is a > > > > > > > lagging > > > > > > > consumer handled? > > > > > > > > > > > > The plan is to disable member kick out when rebalance.timeout is > > > > reached, > > > > > > so basically we are not "waiting" any > > > > > > > > > > > > join group request from existing members; we shall just rebalance > > > base > > > > on > > > > > > what we currently have within the group > > > > > > > > > > > > metadata. Lagging consumer will trigger rebalance later if > session > > > > > timeout > > > > > > > rebalance timeout. > > > > > > > > > > > > > 6. Another detail to take care is that we need to > > automatically > > > > take > > > > > > the > > > > > > > hash of group id so that we know which broker to send this > > > request > > > > > to. > > > > > > > - I assume this should be same as the way we find the > > > > > coordinator, > > > > > > > today right? If yes, should we specify it in the KIP ? > > > > > > > > > > > > Yep, it is. Add FindCoordinatorRequest logic to the script. > > > > > > > > > > > > > 7. Are there any specific failure scenarios when you say > > "other > > > > > > > potential failure cases."? It would be good to mention them > > > > > > explicitly, > > > > > > > if > > > > > > > you think there are any. > > > > > > > > > > > > Nah, I'm gonna remove it because it seems causing more confusion > > than > > > > > > making my assumption clear, which is > > > > > > > > > > > > "there could be other failure cases that I can't enumerate now" > 😊 > > > > > > > > > > > > > 8. It would be good to have a rollback plan as you have for > > roll > > > > > > forward > > > > > > > in the KIP. > > > > > > > > > > > > Great suggestion! Added a simple rollback plan. > > > > > > > > > > > > > > > > > > Next is answering Jason's suggestions: > > > > > > > > > > > > 1. This may be the same thing that Mayuresh is asking about. I > > think > > > > the > > > > > > suggestion in the KIP is that if a consumer sends JoinGroup with > a > > > > member > > > > > > name, but no member id, then we will return the current member id > > > > > > associated with that name. It seems in this case that we wouldn't > > be > > > > able > > > > > > to protect from having two consumers active with the same > > configured > > > > > > member.name? For example, imagine that we had a consumer with > > > > > member.name > > > > > > =A > > > > > > which is assigned member.id=1. Suppose it becomes a zombie and a > > new > > > > > > instance starts up with member.name=A. If it is also assigned > > > > member.id > > > > > =1, > > > > > > then how can we detect the zombie if it comes back to life? Both > > > > > instances > > > > > > will have the same member.id. > > > > > > > > > > > > The goal is to avoid a rebalance on a rolling restart, but we > still > > > > need > > > > > to > > > > > > fence previous members. I am wondering if we can generate a new > > > > > member.id > > > > > > every time we receive a request from a static member with an > > unknown > > > > > member > > > > > > id. If the old instance with the same member.name attempts any > > > > > operation, > > > > > > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long > as > > > the > > > > > > subscription of the new instance hasn't changed, then we can skip > > the > > > > > > rebalance and return the current assignment without forcing a > > > > rebalance. > > > > > > > > > > > > The trick to making this work is in the error handling of the > > zombie > > > > > > consumer. If the zombie simply resets its member.id and rejoins > to > > > > get a > > > > > > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would > > end > > > > up > > > > > > fencing the new member. We want to avoid this. There needs to be > an > > > > > > expectation for static members that the member.id of a static > > member > > > > > will > > > > > > not be changed except when a new member with the same > member.name > > > > joins > > > > > > the > > > > > > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for > > > > consumers > > > > > > with static member names. > > > > > > > > > > > > Yep, I like this idea! Keep giving out refresh member.id when > > facing > > > > > > anonymous request will definitely > > > > > > > > > > > > prevent processing bug due to duplicate consumers, however I > don't > > > > think > > > > > I > > > > > > fully understand the 3rd paragraph where > > > > > > > > > > > > you mentioned "There needs to be an expectation for static > members > > > > that > > > > > > the member.id of a static member will > > > > > > > > > > > > not be changed except when a new member with the same > member.name > > > > joins > > > > > > the group. " How do you plan > > > > > > to know whether this member is new member or old member? I feel > > even > > > > with > > > > > > zombie consumer takes the ownership, > > > > > > it should be detected very quickly (as MISMATCH_ID exception > > trigger > > > > > > original consumer instance dies) > > > > > > and end user will start to fix it right away. Is there any > similar > > > > logic > > > > > > we applied in fencing duplicate `transaction.id`? > > > > > > > > > > > > 2. The mechanics of the ConsumerRebalance API seem unclear to me. > > As > > > > far > > > > > as > > > > > > I understand it, it is used for scaling down a consumer group and > > > > somehow > > > > > > bypasses normal session timeout expiration. I am wondering how > > > critical > > > > > > this piece is and whether we can leave it for future work. If > not, > > > then > > > > > it > > > > > > would be helpful to elaborate on its implementation. How would > the > > > > > > coordinator know which members to kick out of the group? > > > > > > > > > > > > This API is needed when we need to immediately trigger rebalance > > > > instead > > > > > > of waiting session timeout > > > > > > > > > > > > or rebalance timeout (Emergent scale up/down). It is very > necessary > > > to > > > > > > have it for > > > > > > > > > > > > management purpose because user could choose when to trigger > > > rebalance > > > > > > pretty freely, > > > > > > > > > > > > gaining more client side control. > > > > > > > > > > > > In the meanwhile I see your point that we need to actually have > the > > > > > > ability to kick out members that we plan > > > > > > > > > > > > to scale down fast (as rebalance timeout no longer kicks any > > offline > > > > > > member out of the group), I will think of adding an optional > > > > > > > > > > > > list of members that are ready to be removed. > > > > > > > > > > > > Another idea is to let static member send `LeaveGroupRequest` > when > > > they > > > > > > are going offline (either scale down or bouncing), > > > > > > > > > > > > and broker will cache this information as "OfflineMembers" > without > > > > > > triggering rebalance. When handling ConsumerRebalanceRequest > broker > > > > will > > > > > > > > > > > > kick the static members that are currently offline and trigger > > > > rebalance > > > > > > immediately. How does this plan sound? > > > > > > > > > > > > 3. I've been holding back on mentioning this, but I think we > should > > > > > > reconsider the name `member.name`. I think we want something > that > > > > > suggests > > > > > > its expectation of uniqueness in the group. How about ` > > > > group.instance.id > > > > > ` > > > > > > to go along with `group.id`? > > > > > > > > > > > > Yea, Dong and Stanislav also mentioned this naming. I personally > > buy > > > in > > > > > > the namespace idea, and > > > > > > > > > > > > since we already use `member.name` in a lot of context, I decide > > to > > > > > > rename the config to `group.member.name` > > > > > > > > > > > > which should be sufficient for solving all the concerns we have > > now. > > > > > > Sounds good? > > > > > > > > > > > > > > > > > > Thank you for your great suggestions! Let me know if my reply > makes > > > > sense > > > > > > her. > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > > > Boyang > > > > > > > > > > > > ________________________________ > > > > > > From: Jason Gustafson <ja...@confluent.io> > > > > > > Sent: Tuesday, November 27, 2018 7:51 AM > > > > > > To: dev > > > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer > rebalances > > > by > > > > > > specifying member id > > > > > > > > > > > > Hi Boyang, > > > > > > > > > > > > Thanks for the updates. Looks like we're headed in the right > > > direction > > > > > and > > > > > > clearly the interest that this KIP is receiving shows how strong > > the > > > > > > motivation is! > > > > > > > > > > > > I have a few questions: > > > > > > > > > > > > 1. This may be the same thing that Mayuresh is asking about. I > > think > > > > the > > > > > > suggestion in the KIP is that if a consumer sends JoinGroup with > a > > > > member > > > > > > name, but no member id, then we will return the current member id > > > > > > associated with that name. It seems in this case that we wouldn't > > be > > > > able > > > > > > to protect from having two consumers active with the same > > configured > > > > > > member.name? For example, imagine that we had a consumer with > > > > > member.name > > > > > > =A > > > > > > which is assigned member.id=1. Suppose it becomes a zombie and a > > new > > > > > > instance starts up with member.name=A. If it is also assigned > > > > member.id > > > > > =1, > > > > > > then how can we detect the zombie if it comes back to life? Both > > > > > instances > > > > > > will have the same member.id. > > > > > > > > > > > > The goal is to avoid a rebalance on a rolling restart, but we > still > > > > need > > > > > to > > > > > > fence previous members. I am wondering if we can generate a new > > > > > member.id > > > > > > every time we receive a request from a static member with an > > unknown > > > > > member > > > > > > id. If the old instance with the same member.name attempts any > > > > > operation, > > > > > > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long > as > > > the > > > > > > subscription of the new instance hasn't changed, then we can skip > > the > > > > > > rebalance and return the current assignment without forcing a > > > > rebalance. > > > > > > > > > > > > The trick to making this work is in the error handling of the > > zombie > > > > > > consumer. If the zombie simply resets its member.id and rejoins > to > > > > get a > > > > > > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would > > end > > > > up > > > > > > fencing the new member. We want to avoid this. There needs to be > an > > > > > > expectation for static members that the member.id of a static > > member > > > > > will > > > > > > not be changed except when a new member with the same > member.name > > > > joins > > > > > > the > > > > > > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for > > > > consumers > > > > > > with static member names. > > > > > > > > > > > > 2. The mechanics of the ConsumerRebalance API seem unclear to me. > > As > > > > far > > > > > as > > > > > > I understand it, it is used for scaling down a consumer group and > > > > somehow > > > > > > bypasses normal session timeout expiration. I am wondering how > > > critical > > > > > > this piece is and whether we can leave it for future work. If > not, > > > then > > > > > it > > > > > > would be helpful to elaborate on its implementation. How would > the > > > > > > coordinator know which members to kick out of the group? > > > > > > > > > > > > 3. I've been holding back on mentioning this, but I think we > should > > > > > > reconsider the name `member.name`. I think we want something > that > > > > > suggests > > > > > > its expectation of uniqueness in the group. How about ` > > > > group.instance.id > > > > > ` > > > > > > to go along with `group.id`? > > > > > > > > > > > > Thanks, > > > > > > Jason > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 26, 2018 at 10:18 AM Mayuresh Gharat < > > > > > > gharatmayures...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Boyang, > > > > > > > > > > > > > > Thanks a lot for replying to all the queries and discussions > > here, > > > so > > > > > > > patiently. > > > > > > > Really appreciate it. > > > > > > > > > > > > > > Had a few questions and suggestions after rereading the current > > > > version > > > > > > of > > > > > > > the KIP : > > > > > > > > > > > > > > > > > > > > > 1. Do you intend to have member.id is a static config like > > > > > > member.name > > > > > > > after KIP-345 and KIP-394? > > > > > > > 2. Regarding "On client side, we add a new config called > > > > MEMBER_NAME > > > > > > in > > > > > > > ConsumerConfig. On consumer service init, if the MEMBER_NAME > > > > config > > > > > is > > > > > > > set, > > > > > > > we will put it in the initial join group request to identify > > > > itself > > > > > > as a > > > > > > > static member (static membership); otherwise, we will still > > send > > > > > > > UNKNOWN_MEMBER_ID to ask broker for allocating a new random > ID > > > > > > (dynamic > > > > > > > membership)." > > > > > > > - What is the value of member_id sent in the first > > > > > JoinGroupRequest > > > > > > > when member_name is set (using static rebalance)? Is it > > > > > > > UNKNOW_MEMBER_ID? > > > > > > > 3. Regarding "we are requiring member.id (if not unknown) > to > > > > match > > > > > > the > > > > > > > value stored in cache, otherwise reply MEMBER_ID_MISMATCH. > The > > > > edge > > > > > > case > > > > > > > that if we could have members with the same `member.name` > > (for > > > > > > example > > > > > > > mis-configured instances with a valid member.id but added a > > > used > > > > > > member > > > > > > > name on runtime). When member name has duplicates, we could > > > refuse > > > > > > join > > > > > > > request from members with an outdated `member.id` (since we > > > > update > > > > > > the > > > > > > > mapping upon each join group request). In an edge case where > > the > > > > > > client > > > > > > > hits this exception in the response, it is suggesting that > > some > > > > > other > > > > > > > consumer takes its spot." > > > > > > > - The part of "some other consumer takes the spot" would > be > > > > > > > intentional, right? Also when you say " The edge case > that > > if > > > > we > > > > > > > could have members with the same `member.name` (for > > example > > > > > > > mis-configured instances *with a valid member.id < > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&reserved=0 > > > > > > > > > > > > > > *but > > > > > > > added a used member name on runtime).", what do you mean > by > > > > > *valid > > > > > > > member id* here? Does it mean that there exist a mapping > of > > > > > > > member.name to member.id like *MemberA -> id1* on the > > > > > > > GroupCoordinator and this consumer is trying to join > with * > > > > > > > member.name > > > > > > > < > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1zOe9ZqIHB7lvt8XJt2jtLvRaP75G3OamvtLRLjysyo%3D&reserved=0 > > > > > > > > > > > > = MemberB and member.id < > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1bCMsB35UGHNRiI5c9KAzd34wAwiGs7v2OVmWKfRAC0%3D&reserved=0 > > > > > > > > > > > > = > > > > > > > id1 * > > > > > > > ? > > > > > > > 4. Depending on your explanation for point 2 and the point 3 > > > above > > > > > > > regarding returning back MEMBER_ID_MISMATCH on having a > > matching > > > > > > > member_name but unknown member_id, if the consumer sends > > > > > > > "UNKNOW_MEMBER_ID" > > > > > > > on the first JoinGroupRequest and relies on the > > GroupCoordinator > > > > to > > > > > > > give it > > > > > > > a member_id, is the consumer suppose to remember member_id > for > > > > > > > joinGroupRequests? If yes, how are restarts handled? > > > > > > > 5. Regarding "So in summary, *the member will only be > removed > > > due > > > > to > > > > > > > session timeout*. We shall remove it from both in-memory > > static > > > > > member > > > > > > > name mapping and member list." > > > > > > > - If the rebalance is invoked manually using the the > admin > > > > apis, > > > > > > how > > > > > > > long should the group coordinator wait for the members of > > the > > > > > > > group to send > > > > > > > a JoinGroupRequest for participating in the rebalance? > How > > > is a > > > > > > > lagging > > > > > > > consumer handled? > > > > > > > 6. Another detail to take care is that we need to > > automatically > > > > take > > > > > > the > > > > > > > hash of group id so that we know which broker to send this > > > request > > > > > to. > > > > > > > - I assume this should be same as the way we find the > > > > > coordinator, > > > > > > > today right? If yes, should we specify it in the KIP ? > > > > > > > 7. Are there any specific failure scenarios when you say > > "other > > > > > > > potential failure cases."? It would be good to mention them > > > > > > explicitly, > > > > > > > if > > > > > > > you think there are any. > > > > > > > 8. It would be good to have a rollback plan as you have for > > roll > > > > > > forward > > > > > > > in the KIP. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Mayuresh > > > > > > > > > > > > > > On Mon, Nov 26, 2018 at 8:17 AM Mayuresh Gharat < > > > > > > > gharatmayures...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Hi Boyang, > > > > > > > > > > > > > > > > Do you have a discuss thread for KIP-394 that you mentioned > > here > > > ? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Mayuresh > > > > > > > > > > > > > > > > On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen < > > bche...@outlook.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hey Dong, thanks for the follow-up here! > > > > > > > >> > > > > > > > >> > > > > > > > >> 1) It is not very clear to the user what is the difference > > > between > > > > > > > >> member.name and client.id as both seems to be used to > > identify > > > > the > > > > > > > >> consumer. I am wondering if it would be more intuitive to > name > > > it > > > > > > > >> group.member.name (preferred choice since it matches the > > > current > > > > > > > group.id > > > > > > > >> config name) or rebalance.member.name to explicitly show > that > > > the > > > > > id > > > > > > is > > > > > > > >> solely used for rebalance. > > > > > > > >> Great question. I feel `member.name` is enough to explain > > > itself, > > > > > it > > > > > > > >> seems not very > > > > > > > >> helpful to make the config name longer. Comparing `name` > with > > > `id` > > > > > > gives > > > > > > > >> user the > > > > > > > >> impression that they have the control over it with > customized > > > rule > > > > > > than > > > > > > > >> library decided. > > > > > > > >> > > > > > > > >> 2) In the interface change section it is said that > > > > > > > >> GroupMaxSessionTimeoutMs > > > > > > > >> will be changed to 30 minutes. It seems to suggest that we > > will > > > > > change > > > > > > > the > > > > > > > >> default value of this config. It does not seem necessary to > > > > increase > > > > > > the > > > > > > > >> time of consumer failure detection when user doesn't use > > static > > > > > > > >> membership. > > > > > > > >> Also, say static membership is enabled, then this default > > config > > > > > > change > > > > > > > >> will cause a partition to be unavailable for consumption for > > 30 > > > > > > minutes > > > > > > > if > > > > > > > >> there is hard consumer failure, which seems to be worse > > > experience > > > > > > than > > > > > > > >> having unnecessary rebalance (when this timeout is small), > > > > > > particularly > > > > > > > >> for > > > > > > > >> new users of Kafka. Could you explain more why we should > make > > > this > > > > > > > change? > > > > > > > >> We are not changing the default session timeout value. We > are > > > just > > > > > > > >> changing the > > > > > > > >> cap we are enforcing on the session timeout max value. So > this > > > > > change > > > > > > is > > > > > > > >> not affecting > > > > > > > >> what kind of membership end user is using, and loosing the > cap > > > is > > > > > > giving > > > > > > > >> end user > > > > > > > >> more flexibility on trade-off between liveness and > stability. > > > > > > > >> > > > > > > > >> 3) Could we just combine MEMBER_ID_MISMATCH and > > > > > > DUPLICATE_STATIC_MEMBER > > > > > > > >> into one error? It seems that these two errors are currently > > > > handled > > > > > > by > > > > > > > >> the > > > > > > > >> consumer in the same way. And we don't also don't expect > > > > > > > >> MEMBER_ID_MISMATCH > > > > > > > >> to happen. Thus it is not clear what is the benefit of > having > > > two > > > > > > > errors. > > > > > > > >> I agree that we should remove DUPLICATE_STATIC_MEMBER error > > > > because > > > > > > with > > > > > > > >> the KIP-394< > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember%2Bid%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=OcUHb3tNzXVGxruhgofPDya5GpoJqFDfyq0jiIffZMY%3D&reserved=0 > > > > > > > >> > > > > > > > > >> we will automatically fence all join requests with > > > > > UNKNOWN_MEMBER_ID. > > > > > > > >> > > > > > > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join > > group > > > > > > > contains > > > > > > > >> member name which is already in the consumer group, however > > the > > > > > member > > > > > > > id > > > > > > > >> was missing". After a consumer is restarted, it will send a > > > > > > > >> JoinGroupRequest with an existing memberName (as the > > coordinator > > > > has > > > > > > not > > > > > > > >> expired this member from the memory) and memberId > > > > > > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not > > > > > persisted > > > > > > > >> across consumer restart in the consumer side). Does it mean > > that > > > > > > > >> JoinGroupRequest from a newly restarted consumer will always > > be > > > > > > rejected > > > > > > > >> until the sessionTimeoutMs has passed? > > > > > > > >> Same answer as question 3). This part of the logic shall be > > > > removed > > > > > > from > > > > > > > >> the proposal. > > > > > > > >> > > > > > > > >> 5) It seems that we always add two methods to the interface > > > > > > > >> org.apache.kafka.clients.admin.AdminClient.java, one with > > > options > > > > > and > > > > > > > the > > > > > > > >> other without option. Could this be specified in the > interface > > > > > change > > > > > > > >> section? > > > > > > > >> Sounds good! Added both methods. > > > > > > > >> > > > > > > > >> 6) Do we plan to have off-the-shelf command line tool for > SRE > > to > > > > > > trigger > > > > > > > >> rebalance? If so, we probably want to specify the command > line > > > > tool > > > > > > > >> interface similar to > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=egpgN2DcSoFFLeOrGRv9EgtAuMLUxrvazXsUOIKWsGE%3D&reserved=0 > > > > > > > >> . > > > > > > > >> Added the script. > > > > > > > >> > > > > > > > >> 7) Would it be simpler to replace name > "forceStaticRebalance" > > > with > > > > > > > >> "invokeConsumerRebalance"? It is not very clear what is the > > > extra > > > > > > > meaning > > > > > > > >> of world "force" as compared to "trigger" or "invoke". And > it > > > > seems > > > > > > > >> simpler > > > > > > > >> to allows this API to trigger rebalance regardless of > whether > > > > > consumer > > > > > > > is > > > > > > > >> configured with memberName. > > > > > > > >> Sounds good. Right now I feel for both static and dynamic > > > > membership > > > > > > it > > > > > > > is > > > > > > > >> more manageable to introduce the consumer rebalance method > > > through > > > > > > admin > > > > > > > >> client API. > > > > > > > >> > > > > > > > >> 8) It is not very clear how the newly added AdminClient API > > > > trigger > > > > > > > >> rebalance. For example, does it send request? Can this be > > > > explained > > > > > in > > > > > > > the > > > > > > > >> KIP? > > > > > > > >> > > > > > > > >> Sure, I will add more details to the API. > > > > > > > >> > > > > > > > >> > > > > > > > >> Thanks again for the helpful suggestions! > > > > > > > >> > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> Boyang > > > > > > > >> > > > > > > > >> ________________________________ > > > > > > > >> From: Dong Lin <lindon...@gmail.com> > > > > > > > >> Sent: Saturday, November 24, 2018 2:54 PM > > > > > > > >> To: dev > > > > > > > >> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer > > > > rebalances > > > > > by > > > > > > > >> specifying member id > > > > > > > >> > > > > > > > >> Hey Boyang, > > > > > > > >> > > > > > > > >> Thanks for the update! Here are some followup comments: > > > > > > > >> > > > > > > > >> 1) It is not very clear to the user what is the difference > > > between > > > > > > > >> member.name and client.id as both seems to be used to > > identify > > > > the > > > > > > > >> consumer. I am wondering if it would be more intuitive to > name > > > it > > > > > > > >> group.member.name (preferred choice since it matches the > > > current > > > > > > > group.id > > > > > > > >> config name) or rebalance.member.name to explicitly show > that > > > the > > > > > id > > > > > > is > > > > > > > >> solely used for rebalance. > > > > > > > >> > > > > > > > >> 2) In the interface change section it is said that > > > > > > > >> GroupMaxSessionTimeoutMs > > > > > > > >> will be changed to 30 minutes. It seems to suggest that we > > will > > > > > change > > > > > > > the > > > > > > > >> default value of this config. It does not seem necessary to > > > > increase > > > > > > the > > > > > > > >> time of consumer failure detection when user doesn't use > > static > > > > > > > >> membership. > > > > > > > >> Also, say static membership is enabled, then this default > > config > > > > > > change > > > > > > > >> will cause a partition to be unavailable for consumption for > > 30 > > > > > > minutes > > > > > > > if > > > > > > > >> there is hard consumer failure, which seems to be worse > > > experience > > > > > > than > > > > > > > >> having unnecessary rebalance (when this timeout is small), > > > > > > particularly > > > > > > > >> for > > > > > > > >> new users of Kafka. Could you explain more why we should > make > > > this > > > > > > > change? > > > > > > > >> > > > > > > > >> 3) Could we just combine MEMBER_ID_MISMATCH and > > > > > > DUPLICATE_STATIC_MEMBER > > > > > > > >> into one error? It seems that these two errors are currently > > > > handled > > > > > > by > > > > > > > >> the > > > > > > > >> consumer in the same way. And we don't also don't expect > > > > > > > >> MEMBER_ID_MISMATCH > > > > > > > >> to happen. Thus it is not clear what is the benefit of > having > > > two > > > > > > > errors. > > > > > > > >> > > > > > > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join > > group > > > > > > > contains > > > > > > > >> member name which is already in the consumer group, however > > the > > > > > member > > > > > > > id > > > > > > > >> was missing". After a consumer is restarted, it will send a > > > > > > > >> JoinGroupRequest with an existing memberName (as the > > coordinator > > > > has > > > > > > not > > > > > > > >> expired this member from the memory) and memberId > > > > > > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not > > > > > persisted > > > > > > > >> across consumer restart in the consumer side). Does it mean > > that > > > > > > > >> JoinGroupRequest from a newly restarted consumer will always > > be > > > > > > rejected > > > > > > > >> until the sessionTimeoutMs has passed? > > > > > > > >> > > > > > > > >> 5) It seems that we always add two methods to the interface > > > > > > > >> org.apache.kafka.clients.admin.AdminClient.java, one with > > > options > > > > > and > > > > > > > the > > > > > > > >> other without option. Could this be specified in the > interface > > > > > change > > > > > > > >> section? > > > > > > > >> > > > > > > > >> 6) Do we plan to have off-the-shelf command line tool for > SRE > > to > > > > > > trigger > > > > > > > >> rebalance? If so, we probably want to specify the command > line > > > > tool > > > > > > > >> interface similar to > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=egpgN2DcSoFFLeOrGRv9EgtAuMLUxrvazXsUOIKWsGE%3D&reserved=0 > > > > > > > >> . > > > > > > > >> > > > > > > > >> 7) Would it be simpler to replace name > "forceStaticRebalance" > > > with > > > > > > > >> "invokeConsumerRebalance"? It is not very clear what is the > > > extra > > > > > > > meaning > > > > > > > >> of world "force" as compared to "trigger" or "invoke". And > it > > > > seems > > > > > > > >> simpler > > > > > > > >> to allows this API to trigger rebalance regardless of > whether > > > > > consumer > > > > > > > is > > > > > > > >> configured with memberName. > > > > > > > >> > > > > > > > >> 8) It is not very clear how the newly added AdminClient API > > > > trigger > > > > > > > >> rebalance. For example, does it send request? Can this be > > > > explained > > > > > in > > > > > > > the > > > > > > > >> KIP? > > > > > > > >> > > > > > > > >> Thanks, > > > > > > > >> Dong > > > > > > > >> > > > > > > > >> > > > > > > > >> On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen < > > > bche...@outlook.com> > > > > > > > wrote: > > > > > > > >> > > > > > > > >> > Hey Mayuresh, > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > thanks for your feedbacks! I will try do another checklist > > > here. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > By this you mean, even if the application has not called > > > > > > > >> > > KafkaConsumer.poll() within session timeout, it will not > > be > > > > > > sending > > > > > > > >> the > > > > > > > >> > > LeaveGroup request, right? > > > > > > > >> > > > > > > > > >> > Yep it's true, we will prevent client from sending leave > > group > > > > > > request > > > > > > > >> > when they are set with `member.name`. > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > When is the member.name removed from this map? > > > > > > > >> > Good question, we will only kick off member due to session > > > > timeout > > > > > > > >> within > > > > > > > >> > static membership. Let me update the KIP to clearly assert > > > that. > > > > > > > >> > > > > > > > > >> > > How is this case (missing member id) handled on the > client > > > > side? > > > > > > > What > > > > > > > >> is > > > > > > > >> > the application that > > > > > > > >> > > is using the KafkaConsumer suppose to do in this > scenario? > > > > > > > >> > I have extended the two exceptions within join group > > response > > > > V4. > > > > > > > >> > Basically I define both corresponding actions to be > > immediate > > > > > > failing > > > > > > > >> > client application, because so far it is unknown what kind > > of > > > > > client > > > > > > > >> issue > > > > > > > >> > could trigger them. After the first version, we will keep > > > > enhance > > > > > > the > > > > > > > >> error > > > > > > > >> > handling logic! > > > > > > > >> > > > > > > > > >> > > This would mean that it might take more time to detect > > > unowned > > > > > > topic > > > > > > > >> > > partitions and may cause delay for applications that > > perform > > > > > data > > > > > > > >> > mirroring > > > > > > > >> > > tasks. I discussed this with our sre and we have a > > > suggestion > > > > to > > > > > > > make > > > > > > > >> > here > > > > > > > >> > > as listed below separately. > > > > > > > >> > The goal of extending session timeout cap is for users > with > > > good > > > > > > > client > > > > > > > >> > side monitoring tools that could auto-heal the dead > > consumers > > > > very > > > > > > > >> fast. So > > > > > > > >> > it is optional (and personal) to extend session timeout > to a > > > > > > > reasonable > > > > > > > >> > number with different client scenarios. > > > > > > > >> > > > > > > > > >> > > you meant remove unjoined members of the group, right ? > > > > > > > >> > Yep, there is a typo. Thanks for catching this! > > > > > > > >> > > > > > > > > >> > > What do you mean by " Internally we would optimize this > > > logic > > > > by > > > > > > > >> having > > > > > > > >> > > rebalance timeout only in charge of stopping prepare > > > rebalance > > > > > > > stage, > > > > > > > >> > > without removing non-responsive members immediately." > > There > > > > > would > > > > > > > not > > > > > > > >> be > > > > > > > >> > a > > > > > > > >> > > full rebalance if the lagging consumer sent a JoinGroup > > > > request > > > > > > > later, > > > > > > > >> > > right ? If yes, can you highlight this in the KIP ? > > > > > > > >> > No, there won't be. We want to limit the rebalance timeout > > > > > > > functionality > > > > > > > >> > to only use as a timer to > > > > > > > >> > end prepare rebalance stage. This way, late joining static > > > > members > > > > > > > will > > > > > > > >> > not trigger further rebalance > > > > > > > >> > as long as they are within session timeout. I added your > > > > highlight > > > > > > to > > > > > > > >> the > > > > > > > >> > KIP! > > > > > > > >> > > > > > > > > >> > > The KIP talks about scale up scenario but its not quite > > > clear > > > > > how > > > > > > we > > > > > > > >> > > handle it. Are we adding a separate "expansion.timeout" > or > > > we > > > > > > adding > > > > > > > >> > status > > > > > > > >> > > "learner" ?. Can you shed more light on how this is > > handled > > > in > > > > > the > > > > > > > >> KIP, > > > > > > > >> > if > > > > > > > >> > > its handled? > > > > > > > >> > Updated the KIP: we shall not cover scale up case in 345, > > > > because > > > > > we > > > > > > > >> > believe client side could > > > > > > > >> > better handle this logic. > > > > > > > >> > > > > > > > > >> > > I think Jason had brought this up earlier about having a > > way > > > > to > > > > > > say > > > > > > > >> how > > > > > > > >> > > many members/consumer hosts are you choosing to be in > the > > > > > consumer > > > > > > > >> group. > > > > > > > >> > > If we can do this, then in case of mirroring > applications > > we > > > > can > > > > > > do > > > > > > > >> this > > > > > > > >> > : > > > > > > > >> > > Lets say we have a mirroring application that consumes > > from > > > > > Kafka > > > > > > > >> cluster > > > > > > > >> > > A and produces to Kafka cluster B. > > > > > > > >> > > Depending on the data and the Kafka cluster > configuration, > > > > Kafka > > > > > > > >> service > > > > > > > >> > > providers can set a mirroring group saying that it will > > > take, > > > > > for > > > > > > > >> example > > > > > > > >> > > 300 consumer hosts/members to achieve the desired > > throughput > > > > and > > > > > > > >> latency > > > > > > > >> > > for mirroring and can have additional 10 consumer hosts > as > > > > spare > > > > > > in > > > > > > > >> the > > > > > > > >> > > same group. > > > > > > > >> > > So when the first 300 members/consumers to join the > group > > > will > > > > > > start > > > > > > > >> > > mirroring the data from Kafka cluster A to Kafka cluster > > B. > > > > > > > >> > > The remaining 10 consumer members can sit idle. > > > > > > > >> > > The moment one of the consumer (for example: consumer > > number > > > > 54) > > > > > > > from > > > > > > > >> the > > > > > > > >> > > first 300 members go out of the group (crossed session > > > > timeout), > > > > > > it > > > > > > > >> (the > > > > > > > >> > > groupCoordinator) can just assign the topicPartitions > from > > > the > > > > > > > >> consumer > > > > > > > >> > > member 54 to one of the spare hosts. > > > > > > > >> > > Once the consumer member 54 comes back up, it can start > as > > > > > being a > > > > > > > >> part > > > > > > > >> > of > > > > > > > >> > > the spare pool. > > > > > > > >> > > This enables us to have lower session timeouts and low > > > latency > > > > > > > >> mirroring, > > > > > > > >> > > in cases where the service providers are OK with having > > > spare > > > > > > hosts. > > > > > > > >> > > This would mean that we would tolerate n consumer > members > > > > > leaving > > > > > > > and > > > > > > > >> > > rejoining the group and still provide low latency as > long > > > as n > > > > > <= > > > > > > > >> number > > > > > > > >> > of > > > > > > > >> > > spare consumers. > > > > > > > >> > > If there are no spare host available, we can get back to > > the > > > > > idea > > > > > > as > > > > > > > >> > > described in the KIP. > > > > > > > >> > Great idea! In fact on top of static membership we could > > later > > > > > > > introduce > > > > > > > >> > APIs to set hard-coded > > > > > > > >> > client ids to the group and replace the dead host, or as > you > > > > > > proposed > > > > > > > to > > > > > > > >> > define spare host as > > > > > > > >> > what I understood as hot backup. I will put both Jason and > > > your > > > > > > > >> > suggestions into a separate section > > > > > > > >> > called "Future works". Note that this spare host idea may > be > > > > also > > > > > > > >> solvable > > > > > > > >> > through rebalance protocol > > > > > > > >> > IMO. > > > > > > > >> > > > > > > > > >> > Thank you again for the great feedback! > > > > > > > >> > > > > > > > > >> > Boyang > > > > > > > >> > ________________________________ > > > > > > > >> > From: Boyang Chen <bche...@outlook.com> > > > > > > > >> > Sent: Thursday, November 22, 2018 3:39 PM > > > > > > > >> > To: dev@kafka.apache.org > > > > > > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer > > > > > rebalances > > > > > > by > > > > > > > >> > specifying member id > > > > > > > >> > > > > > > > > >> > Hey Dong, sorry for missing your message. I couldn't find > > your > > > > > email > > > > > > > on > > > > > > > >> my > > > > > > > >> > thread, so I will just do a checklist here! > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > 1) The motivation currently explicitly states that the > goal > > is > > > > to > > > > > > > >> improve > > > > > > > >> > > > > > > > > >> > performance for heavy state application. It seems that the > > > > > > motivation > > > > > > > >> can > > > > > > > >> > > > > > > > > >> > be stronger with the following use-case. Currently for > > > > MirrorMaker > > > > > > > >> cluster > > > > > > > >> > > > > > > > > >> > with e.g. 100 MirrorMaker processes, it will take a long > > time > > > to > > > > > > > rolling > > > > > > > >> > > > > > > > > >> > bounce the entire MirrorMaker cluster. Each MirrorMaker > > > process > > > > > > > restart > > > > > > > >> > > > > > > > > >> > will trigger a rebalance which currently pause the > > consumption > > > > of > > > > > > the > > > > > > > >> all > > > > > > > >> > > > > > > > > >> > partitions of the MirrorMaker cluster. With the change > > stated > > > in > > > > > > this > > > > > > > >> > > > > > > > > >> > patch, as long as a MirrorMaker can restart within the > > > specified > > > > > > > timeout > > > > > > > >> > > > > > > > > >> > (e.g. 2 minutes), then we only need constant number of > > > rebalance > > > > > > (e.g. > > > > > > > >> for > > > > > > > >> > > > > > > > > >> > leader restart) for the entire rolling bounce, which will > > > > > > > significantly > > > > > > > >> > > > > > > > > >> > improves the availability of the MirrorMaker pipeline. In > my > > > > > > opinion, > > > > > > > >> the > > > > > > > >> > > > > > > > > >> > main benefit of the KIP is to avoid unnecessary rebalance > if > > > the > > > > > > > >> consumer > > > > > > > >> > > > > > > > > >> > process can be restarted within soon, which helps > > performance > > > > even > > > > > > if > > > > > > > >> > > > > > > > > >> > overhead of state shuffling for a given process is small. > > > > > > > >> > > > > > > > > >> > I just rephrased this part and added it to the KIP. Thanks > > for > > > > > > making > > > > > > > >> the > > > > > > > >> > motivation more solid! > > > > > > > >> > > > > > > > > >> > 2) In order to simplify the KIP reading, can you follow > the > > > > > writeup > > > > > > > >> style > > > > > > > >> > of other KIP (e.g. KIP-98) and list the interface change > > such > > > as > > > > > new > > > > > > > >> > configs (e.g. registration timeout), new request/response, > > new > > > > > > > >> AdminClient > > > > > > > >> > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? > > > Currently > > > > > > some > > > > > > > of > > > > > > > >> > these are specified in the Proposed Change section which > > makes > > > > it > > > > > a > > > > > > > bit > > > > > > > >> > inconvenient to understand the new interface that will be > > > > exposed > > > > > to > > > > > > > >> user. > > > > > > > >> > Explanation of the current two-phase rebalance protocol > > > probably > > > > > can > > > > > > > be > > > > > > > >> > moved out of public interface section. > > > > > > > >> > This is a great suggestion! I just consolidated all the > > public > > > > API > > > > > > > >> > changes, and the whole KIP > > > > > > > >> > looks much more organized! > > > > > > > >> > > > > > > > > >> > 3) There are currently two version of JoinGroupRequest in > > the > > > > KIP > > > > > > and > > > > > > > >> only > > > > > > > >> > one of them has field memberId. This seems confusing. > > > > > > > >> > Yep, I already found this issue and fixed it. > > > > > > > >> > > > > > > > > >> > 4) It is mentioned in the KIP that "An admin API to force > > > > > rebalance > > > > > > > >> could > > > > > > > >> > be helpful here, but we will make a call once we finished > > the > > > > > major > > > > > > > >> > implementation". So this seems to be still an open > question > > in > > > > the > > > > > > > >> current > > > > > > > >> > design. We probably want to agree on this before voting > for > > > the > > > > > KIP. > > > > > > > >> > We have finalized the idea that this API is needed. > > > > > > > >> > > > > > > > > >> > 5) The KIP currently adds new config MEMBER_NAME for > > consumer. > > > > Can > > > > > > you > > > > > > > >> > specify the name of the config key and the default config > > > value? > > > > > > > >> Possible > > > > > > > >> > default values include empty string or null (similar to > > > > > > > transaction.id< > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=6Z0ODRuDX0I6g1QsYY54M%2BBypx3tuJ8fIvPSOLCBvcM%3D&reserved=0 > > > > > > > >> > > > > > > > > >> > in > > > > > > > >> > producer config). > > > > > > > >> > I have defined the `member.name` in "New configuration" > > > > section. > > > > > > > >> > > > > > > > > >> > 6) Regarding the use of the topic "static_member_map" to > > > persist > > > > > > > member > > > > > > > >> > name map, currently if consumer coordinator broker goes > > > offline, > > > > > > > >> rebalance > > > > > > > >> > is triggered and consumers will try connect to the new > > > > > coordinator. > > > > > > If > > > > > > > >> > these consumers can connect to the new coordinator within > > > > > > > >> > max.poll.interval.ms< > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=pwFb%2BRlHT3zteC318DrffUnPNCgucXIcFnsdtEl22BE%3D&reserved=0 > > > > > > > >> > > > > > > > > >> > which by default is 5 minutes, given that broker can > > > > > > > >> > use a deterministic algorithm to determine the partition > -> > > > > > > > member_name > > > > > > > >> > mapping, each consumer should get assigned the same set of > > > > > > partitions > > > > > > > >> > without requiring state shuffling. So it is not clear > > whether > > > we > > > > > > have > > > > > > > a > > > > > > > >> > strong use-case for this new logic. Can you help clarify > > what > > > is > > > > > the > > > > > > > >> > benefit of using topic "static_member_map" to persist > member > > > > name > > > > > > map? > > > > > > > >> > I have discussed with Guozhang offline, and I believe > > reusing > > > > the > > > > > > > >> current > > > > > > > >> > `_consumer_offsets` > > > > > > > >> > topic is a better and unified solution. > > > > > > > >> > > > > > > > > >> > 7) Regarding the introduction of the expensionTimeoutMs > > > config, > > > > it > > > > > > is > > > > > > > >> > mentioned that "we are using expansion timeout to replace > > > > > rebalance > > > > > > > >> > timeout, which is configured by max.poll.intervals from > > client > > > > > side, > > > > > > > and > > > > > > > >> > using registration timeout to replace session timeout". > > > > Currently > > > > > > the > > > > > > > >> > default max.poll.interval.ms< > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=pwFb%2BRlHT3zteC318DrffUnPNCgucXIcFnsdtEl22BE%3D&reserved=0 > > > > > > > >> > > > > > > > > >> > is configured to be 5 minutes and there will > > > > > > > >> > be only one rebalance if all new consumers can join > within 5 > > > > > > minutes. > > > > > > > >> So it > > > > > > > >> > is not clear whether we have a strong use-case for this > new > > > > > config. > > > > > > > Can > > > > > > > >> you > > > > > > > >> > explain what is the benefit of introducing this new > config? > > > > > > > >> > Previously our goal is to use expansion timeout as a > > > workaround > > > > > for > > > > > > > >> > triggering multiple > > > > > > > >> > rebalances when scaling up members are not joining at the > > same > > > > > time. > > > > > > > It > > > > > > > >> is > > > > > > > >> > decided to > > > > > > > >> > be addressed by client side protocol change, so we will > not > > > > > > introduce > > > > > > > >> > expansion timeout. > > > > > > > >> > > > > > > > > >> > 8) It is mentioned that "To distinguish between previous > > > version > > > > > of > > > > > > > >> > protocol, we will also increase the join group request > > version > > > > to > > > > > v4 > > > > > > > >> when > > > > > > > >> > MEMBER_NAME is set" and "If the broker version is not the > > > latest > > > > > (< > > > > > > > v4), > > > > > > > >> > the join group request shall be downgraded to v3 without > > > setting > > > > > the > > > > > > > >> member > > > > > > > >> > Id". It is probably simpler to just say that this feature > is > > > > > enabled > > > > > > > if > > > > > > > >> > JoinGroupRequest V4 is supported on both client and broker > > and > > > > > > > >> MEMBER_NAME > > > > > > > >> > is configured with non-empty string. > > > > > > > >> > Yep, addressed this! > > > > > > > >> > > > > > > > > >> > 9) It is mentioned that broker may return > > > > > NO_STATIC_MEMBER_INFO_SET > > > > > > > >> error > > > > > > > >> > in OffsetCommitResponse for "commit requests under static > > > > > > membership". > > > > > > > >> Can > > > > > > > >> > you clarify how broker determines whether the commit > request > > > is > > > > > > under > > > > > > > >> > static membership? > > > > > > > >> > > > > > > > > >> > We have agreed that commit request shouldn't be affected > by > > > the > > > > > new > > > > > > > >> > membership, thus > > > > > > > >> > removing it here. Thanks for catching this! > > > > > > > >> > > > > > > > > >> > Let me know if you have further suggestions or concerns. > > Thank > > > > you > > > > > > for > > > > > > > >> > your valuable feedback > > > > > > > >> > to help me design the KIP better! (And I will try to > address > > > > your > > > > > > > >> > feedbacks in next round Mayuresh ??) > > > > > > > >> > > > > > > > > >> > Best, > > > > > > > >> > Boyang > > > > > > > >> > ________________________________ > > > > > > > >> > From: Mayuresh Gharat <gharatmayures...@gmail.com> > > > > > > > >> > Sent: Wednesday, November 21, 2018 7:50 AM > > > > > > > >> > To: dev@kafka.apache.org > > > > > > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer > > > > > rebalances > > > > > > by > > > > > > > >> > specifying member id > > > > > > > >> > > > > > > > > >> > Hi Boyang, > > > > > > > >> > > > > > > > > >> > Thanks for updating the KIP. This is a step good direction > > for > > > > > > > stateful > > > > > > > >> > applications and also mirroring applications whose latency > > is > > > > > > affected > > > > > > > >> due > > > > > > > >> > to the rebalance issues that we have today. > > > > > > > >> > > > > > > > > >> > I had a few questions on the current version of the KIP : > > > > > > > >> > For the effectiveness of the KIP, consumer with > member.name > > > set > > > > > > will > > > > > > > >> *not > > > > > > > >> > send leave group request* when they go offline > > > > > > > >> > > > > > > > > >> > > By this you mean, even if the application has not called > > > > > > > >> > > KafkaConsumer.poll() within session timeout, it will not > > be > > > > > > sending > > > > > > > >> the > > > > > > > >> > > LeaveGroup request, right? > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Broker will maintain an in-memory mapping of {member.name > ? > > > > > > member.id > > > > > > > } > > > > > > > >> to > > > > > > > >> > track member uniqueness. > > > > > > > >> > > > > > > > > >> > > When is the member.name removed from this map? > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Member.id must be set if the *member.name < > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=1zOe9ZqIHB7lvt8XJt2jtLvRaP75G3OamvtLRLjysyo%3D&reserved=0 > > > > > > > >> > > > > > > > > >> > *is already > > > > > > > >> > within the map. Otherwise reply MISSING_MEMBER_ID > > > > > > > >> > > > > > > > > >> > > How is this case handled on the client side? What is the > > > > > > application > > > > > > > >> that > > > > > > > >> > > is using the KafkaConsumer suppose to do in this > scenario? > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Session timeout is the timeout we will trigger rebalance > > when > > > a > > > > > > member > > > > > > > >> goes > > > > > > > >> > offline for too long (not sending heartbeat request). To > > make > > > > > static > > > > > > > >> > membership effective, we should increase the default max > > > session > > > > > > > >> timeout to > > > > > > > >> > 30 min so that end user could config it freely. > > > > > > > >> > > > > > > > > >> > > This would mean that it might take more time to detect > > > unowned > > > > > > topic > > > > > > > >> > > partitions and may cause delay for applications that > > perform > > > > > data > > > > > > > >> > mirroring > > > > > > > >> > > tasks. I discussed this with our sre and we have a > > > suggestion > > > > to > > > > > > > make > > > > > > > >> > here > > > > > > > >> > > as listed below separately. > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Currently there is a config called *rebalance timeout* > which > > > is > > > > > > > >> configured > > > > > > > >> > by consumer *max.poll.intervals*. The reason we set it to > > poll > > > > > > > interval > > > > > > > >> is > > > > > > > >> > because consumer could only send request within the call > of > > > > poll() > > > > > > and > > > > > > > >> we > > > > > > > >> > want to wait sufficient time for the join group request. > > When > > > > > > reaching > > > > > > > >> > rebalance timeout, the group will move towards > > > > completingRebalance > > > > > > > stage > > > > > > > >> > and remove unjoined groups > > > > > > > >> > > > > > > > > >> > > you meant remove unjoined members of the group, right ? > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Currently there is a config called *rebalance timeout* > which > > > is > > > > > > > >> configured > > > > > > > >> > by consumer *max.poll.intervals*. The reason we set it to > > poll > > > > > > > interval > > > > > > > >> is > > > > > > > >> > because consumer could only send request within the call > of > > > > poll() > > > > > > and > > > > > > > >> we > > > > > > > >> > want to wait sufficient time for the join group request. > > When > > > > > > reaching > > > > > > > >> > rebalance timeout, the group will move towards > > > > completingRebalance > > > > > > > stage > > > > > > > >> > and remove unjoined groups. This is actually conflicting > > with > > > > the > > > > > > > >> design of > > > > > > > >> > static membership, because those temporarily unavailable > > > members > > > > > > will > > > > > > > >> > potentially reattempt the join group and trigger extra > > > > rebalances. > > > > > > > >> > Internally we would optimize this logic by having > rebalance > > > > > timeout > > > > > > > >> only in > > > > > > > >> > charge of stopping prepare rebalance stage, without > removing > > > > > > > >> non-responsive > > > > > > > >> > members immediately. > > > > > > > >> > > > > > > > > >> > > What do you mean by " Internally we would optimize this > > > logic > > > > by > > > > > > > >> having > > > > > > > >> > > rebalance timeout only in charge of stopping prepare > > > rebalance > > > > > > > stage, > > > > > > > >> > > without removing non-responsive members immediately." > > There > > > > > would > > > > > > > not > > > > > > > >> be > > > > > > > >> > a > > > > > > > >> > > full rebalance if the lagging consumer sent a JoinGroup > > > > request > > > > > > > later, > > > > > > > >> > > right ? If yes, can you highlight this in the KIP ? > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Scale Up > > > > > > > >> > > > > > > > > >> > > The KIP talks about scale up scenario but its not quite > > > clear > > > > > how > > > > > > we > > > > > > > >> > > handle it. Are we adding a separate "expansion.timeout" > or > > > we > > > > > > adding > > > > > > > >> > status > > > > > > > >> > > "learner" ?. Can you shed more light on how this is > > handled > > > in > > > > > the > > > > > > > >> KIP, > > > > > > > >> > if > > > > > > > >> > > its handled? > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > *Discussion* > > > > > > > >> > Larger session timeouts causing latency rise for getting > > data > > > > for > > > > > > > >> un-owned > > > > > > > >> > topic partitions : > > > > > > > >> > > > > > > > > >> > > I think Jason had brought this up earlier about having a > > way > > > > to > > > > > > say > > > > > > > >> how > > > > > > > >> > > many members/consumer hosts are you choosing to be in > the > > > > > consumer > > > > > > > >> group. > > > > > > > >> > > If we can do this, then in case of mirroring > applications > > we > > > > can > > > > > > do > > > > > > > >> this > > > > > > > >> > : > > > > > > > >> > > Lets say we have a mirroring application that consumes > > from > > > > > Kafka > > > > > > > >> cluster > > > > > > > >> > > A and produces to Kafka cluster B. > > > > > > > >> > > Depending on the data and the Kafka cluster > configuration, > > > > Kafka > > > > > > > >> service > > > > > > > >> > > providers can set a mirroring group saying that it will > > > take, > > > > > for > > > > > > > >> example > > > > > > > >> > > 300 consumer hosts/members to achieve the desired > > throughput > > > > and > > > > > > > >> latency > > > > > > > >> > > for mirroring and can have additional 10 consumer hosts > as > > > > spare > > > > > > in > > > > > > > >> the > > > > > > > >> > > same group. > > > > > > > >> > > So when the first 300 members/consumers to join the > group > > > will > > > > > > start > > > > > > > >> > > mirroring the data from Kafka cluster A to Kafka cluster > > B. > > > > > > > >> > > The remaining 10 consumer members can sit idle. > > > > > > > >> > > The moment one of the consumer (for example: consumer > > number > > > > 54) > > > > > > > from > > > > > > > >> the > > > > > > > >> > > first 300 members go out of the group (crossed session > > > > timeout), > > > > > > it > > > > > > > >> (the > > > > > > > >> > > groupCoordinator) can just assign the topicPartitions > from > > > the > > > > > > > >> consumer > > > > > > > >> > > member 54 to one of the spare hosts. > > > > > > > >> > > Once the consumer member 54 comes back up, it can start > as > > > > > being a > > > > > > > >> part > > > > > > > >> > of > > > > > > > >> > > the spare pool. > > > > > > > >> > > This enables us to have lower session timeouts and low > > > latency > > > > > > > >> mirroring, > > > > > > > >> > > in cases where the service providers are OK with having > > > spare > > > > > > hosts. > > > > > > > >> > > This would mean that we would tolerate n consumer > members > > > > > leaving > > > > > > > and > > > > > > > >> > > rejoining the group and still provide low latency as > long > > > as n > > > > > <= > > > > > > > >> number > > > > > > > >> > of > > > > > > > >> > > spare consumers. > > > > > > > >> > > If there are no spare host available, we can get back to > > the > > > > > idea > > > > > > as > > > > > > > >> > > described in the KIP. > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > Thanks, > > > > > > > >> > > > > > > > > >> > Mayuresh > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis < > > > > > > > >> > konstant...@confluent.io> wrote: > > > > > > > >> > > > > > > > > >> > > Hi Boyang. > > > > > > > >> > > > > > > > > > >> > > Thanks for preparing this KIP! It is making good > progress > > > and > > > > > will > > > > > > > be > > > > > > > >> a > > > > > > > >> > > great improvement for stateful Kafka applications. > > > > > > > >> > > > > > > > > > >> > > Apologies for my late reply, I was away for a while. > Lots > > of > > > > > great > > > > > > > >> > comments > > > > > > > >> > > so far, so I'll probably second most of them in what I > > > suggest > > > > > > below > > > > > > > >> at > > > > > > > >> > > this point. > > > > > > > >> > > > > > > > > > >> > > When I first read the KIP, I wanted to start at the end > > with > > > > > > > something > > > > > > > >> > that > > > > > > > >> > > wasn't highlighted a lot. That was the topic related to > > > > handling > > > > > > > >> > duplicate > > > > > > > >> > > members. I see now that the initial suggestion of > handling > > > > this > > > > > > > >> situation > > > > > > > >> > > during offset commit has been removed, and I agree with > > > that. > > > > > > Issues > > > > > > > >> > > related to membership seem to be handled better when the > > > > member > > > > > > > joins > > > > > > > >> the > > > > > > > >> > > group rather than when it tries to commit offsets. This > > also > > > > > > > >> simplifies > > > > > > > >> > how > > > > > > > >> > > many request types need to change in order to > incorporate > > > the > > > > > new > > > > > > > >> member > > > > > > > >> > > name field. > > > > > > > >> > > > > > > > > > >> > > I also agree with what Jason and Guozhang have said > > > regarding > > > > > > > >> timeouts. > > > > > > > >> > > Although semantically, it's easier to think of every > > > operation > > > > > > > having > > > > > > > >> its > > > > > > > >> > > own timeout, operationally this can become a burden. > Thus, > > > > > > > >> consolidation > > > > > > > >> > > seems preferable here. The definition of embedded > > protocols > > > on > > > > > top > > > > > > > of > > > > > > > >> the > > > > > > > >> > > base group membership protocol for rebalancing gives > > enough > > > > > > > >> flexibility > > > > > > > >> > to > > > > > > > >> > > address such needs in each client component separately. > > > > > > > >> > > > > > > > > > >> > > Finally, some minor comments: > > > > > > > >> > > In a few places the new/proposed changes are referred to > > as > > > > > > > "current". > > > > > > > >> > > Which is a bit confusing considering that there is a > > > protocol > > > > in > > > > > > > place > > > > > > > >> > > already, and by "current" someone might understand the > > > > existing > > > > > > one. > > > > > > > >> I'd > > > > > > > >> > > recommend using new/proposed or equivalent when > referring > > to > > > > > > changes > > > > > > > >> > > introduced with KIP-345 and current/existing or > equivalent > > > > when > > > > > > > >> referring > > > > > > > >> > > to existing behavior. > > > > > > > >> > > > > > > > > > >> > > There's the following sentence in the "Public > Interfaces" > > > > > section: > > > > > > > >> > > "Since for many stateful consumer/stream applications, > the > > > > state > > > > > > > >> > shuffling > > > > > > > >> > > is more painful than short time partial unavailability." > > > > > > > >> > > However, my understanding is that the changes proposed > > with > > > > > > KIP-345 > > > > > > > >> will > > > > > > > >> > > not exploit any partial availability. A suggestion for > > > dealing > > > > > > with > > > > > > > >> > > temporary imbalances has been made in "Incremental > > > Cooperative > > > > > > > >> > Rebalancing" > > > > > > > >> > > which can work well with KIP-345, but here I don't see > > > > proposed > > > > > > > >> changes > > > > > > > >> > > that suggest that some resources (e.g. partitions) will > > keep > > > > > being > > > > > > > >> used > > > > > > > >> > > while others will not be utilized. Thus, you might want > to > > > > > adjust > > > > > > > this > > > > > > > >> > > sentence. Correct me if I'm missing something related to > > > that. > > > > > > > >> > > > > > > > > > >> > > In the rejected alternatives, under point 2) I read "we > > can > > > > copy > > > > > > the > > > > > > > >> > member > > > > > > > >> > > id to the config files". I believe it means to say > "member > > > > name" > > > > > > > >> unless > > > > > > > >> > I'm > > > > > > > >> > > missing something about reusing member ids. Also below I > > > read: > > > > > "By > > > > > > > >> > allowing > > > > > > > >> > > consumers to optionally specifying a member id" which > > > probably > > > > > > > implies > > > > > > > >> > > "member name" again. In a sense this section highlights > a > > > > > > potential > > > > > > > >> > > confusion between member name and member id. I wonder if > > we > > > > > could > > > > > > > >> come up > > > > > > > >> > > with a better term for the new field. StaticTag, > > > StaticLabel, > > > > or > > > > > > > even > > > > > > > >> > > StaticName are some suggestions that could potentially > > help > > > > with > > > > > > > >> > confusion > > > > > > > >> > > between MemberId and MemberName and what corresponds to > > > what. > > > > > But > > > > > > I > > > > > > > >> > > wouldn't like to disrupt the discussion with naming > > > > conventions > > > > > > too > > > > > > > >> much > > > > > > > >> > at > > > > > > > >> > > this point. I just mention it here as a thought. > > > > > > > >> > > > > > > > > > >> > > Looking forward to see the final details of this KIP. > > Great > > > > work > > > > > > so > > > > > > > >> far! > > > > > > > >> > > > > > > > > > >> > > Konstantine > > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen < > > > > > bche...@outlook.com> > > > > > > > >> wrote: > > > > > > > >> > > > > > > > > > >> > > > Thanks Guozhang for the great summary here, and I have > > > been > > > > > > > >> following > > > > > > > >> > up > > > > > > > >> > > > the action items here. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > 1. I already updated the KIP to remove the > expansion > > > > > timeout > > > > > > > and > > > > > > > >> > > > registration timeout. Great to see them being > addressed > > in > > > > > > client > > > > > > > >> side! > > > > > > > >> > > > 2. I double checked the design and I believe that > it > > is > > > > ok > > > > > to > > > > > > > >> have > > > > > > > >> > > both > > > > > > > >> > > > static member and dynamic member co-exist in the same > > > group. > > > > > So > > > > > > > the > > > > > > > >> > > upgrade > > > > > > > >> > > > shouldn't be destructive and we are removing the two > > > > > membership > > > > > > > >> > protocol > > > > > > > >> > > > switching APIs. > > > > > > > >> > > > 3. I only have question about this one. I'm still > > > reading > > > > > the > > > > > > > >> > > KafkaApis > > > > > > > >> > > > code here. Should I just use the same authorization > > logic > > > > for > > > > > > > >> > > > ForceStaticRebalanceRequest as JoinGroupRequest? > > > > > > > >> > > > 4. I'm very excited to see this work with K8! Like > > you > > > > > > > suggested, > > > > > > > >> > this > > > > > > > >> > > > feature could be better addressed in a separate KIP > > > because > > > > it > > > > > > is > > > > > > > >> > pretty > > > > > > > >> > > > independent. I could start drafting the KIP once the > > > current > > > > > > > >> proposal > > > > > > > >> > is > > > > > > > >> > > > approved. > > > > > > > >> > > > 5. I believe that we don't need fencing in offset > > > commit > > > > > > > request, > > > > > > > >> > > since > > > > > > > >> > > > duplicate member.name issue could be handled by join > > > group > > > > > > > >> request. We > > > > > > > >> > > > shall reject join group with known member name but no > > > member > > > > > id > > > > > > > >> (which > > > > > > > >> > > > means we already have an active member using this > > > identity). > > > > > > > >> > > > 6. I agree to remove that internal config once we > > move > > > > > > forward > > > > > > > >> with > > > > > > > >> > > > static membership. And I already removed the entire > > > section > > > > > from > > > > > > > the > > > > > > > >> > KIP. > > > > > > > >> > > > > > > > > > > >> > > > Let me know if you have other concerns. > > > > > > > >> > > > > > > > > > > >> > > > Best, > > > > > > > >> > > > Boyang > > > > > > > >> > > > ________________________________ > > > > > > > >> > > > From: Guozhang Wang <wangg...@gmail.com> > > > > > > > >> > > > Sent: Tuesday, November 20, 2018 4:21 PM > > > > > > > >> > > > To: dev > > > > > > > >> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple > consumer > > > > > > > rebalances > > > > > > > >> by > > > > > > > >> > > > specifying member id > > > > > > > >> > > > > > > > > > > >> > > > Hello Boyang, > > > > > > > >> > > > > > > > > > > >> > > > Thanks a lot for the KIP! It is a great write-up and I > > > > > > appreciate > > > > > > > >> your > > > > > > > >> > > > patience answering to the feedbacks from the > community. > > > I'd > > > > > like > > > > > > > to > > > > > > > >> add > > > > > > > >> > > my > > > > > > > >> > > > 2cents here: > > > > > > > >> > > > > > > > > > > >> > > > 1. By introducing another two timeout configs, > > > > > > > registration_timeout > > > > > > > >> and > > > > > > > >> > > > expansion_timeout, we are effectively having four > > timeout > > > > > > configs: > > > > > > > >> > > session > > > > > > > >> > > > timeout, rebalance timeout (configured as " > > > > > max.poll.interval.ms > > > > > > " > > > > > > > on > > > > > > > >> > > client > > > > > > > >> > > > side), and these two. Interplaying these timeout > configs > > > can > > > > > be > > > > > > > >> quite > > > > > > > >> > > hard > > > > > > > >> > > > for users with such complexity, and hence I'm > wondering > > if > > > > we > > > > > > can > > > > > > > >> > > simplify > > > > > > > >> > > > the situation with as less possible timeout configs as > > > > > possible. > > > > > > > >> Here > > > > > > > >> > is > > > > > > > >> > > a > > > > > > > >> > > > concrete suggestion I'd like propose: > > > > > > > >> > > > > > > > > > > >> > > > 1.a) Instead of introducing a registration_timeout in > > > > addition > > > > > > to > > > > > > > >> the > > > > > > > >> > > > session_timeout for static members, we can just reuse > > the > > > > > > > >> > session_timeout > > > > > > > >> > > > and ask users to set it to a larger value when they > are > > > > > > upgrading > > > > > > > a > > > > > > > >> > > dynamic > > > > > > > >> > > > client to a static client by setting the "member.name > " > > at > > > > the > > > > > > > same > > > > > > > >> > time. > > > > > > > >> > > > By > > > > > > > >> > > > default, the broker-side min.session.timeout is 6 > > seconds > > > > and > > > > > > > >> > > > max.session.timeout is 5 minutes, which seems > reasonable > > > to > > > > me > > > > > > (we > > > > > > > >> can > > > > > > > >> > of > > > > > > > >> > > > course modify this broker config to enlarge the valid > > > > interval > > > > > > if > > > > > > > we > > > > > > > >> > want > > > > > > > >> > > > in practice). And then we should also consider > removing > > > the > > > > > > > >> condition > > > > > > > >> > for > > > > > > > >> > > > marking a client as failed if the rebalance timeout > has > > > > > reached > > > > > > > >> while > > > > > > > >> > the > > > > > > > >> > > > JoinGroup was not received, so that the semantics of > > > > > > > session_timeout > > > > > > > >> > and > > > > > > > >> > > > rebalance_timeout are totally separated: the former is > > > only > > > > > used > > > > > > > to > > > > > > > >> > > > determine if a consumer member of the group should be > > > marked > > > > > as > > > > > > > >> failed > > > > > > > >> > > and > > > > > > > >> > > > kicked out of the group, and the latter is only used > to > > > > > > determine > > > > > > > >> the > > > > > > > >> > > > longest time coordinator should wait for > > PREPARE_REBALANCE > > > > > > phase. > > > > > > > In > > > > > > > >> > > other > > > > > > > >> > > > words if a member did not send the JoinGroup in time > of > > > the > > > > > > > >> > > > rebalance_timeout, we still include it in the new > > > generation > > > > > of > > > > > > > the > > > > > > > >> > group > > > > > > > >> > > > and use its old subscription info to send to leader > for > > > > > > > assignment. > > > > > > > >> > Later > > > > > > > >> > > > if the member came back with HeartBeat request, we can > > > still > > > > > > > follow > > > > > > > >> the > > > > > > > >> > > > normal path to bring it to the latest generation while > > > > > checking > > > > > > > that > > > > > > > >> > its > > > > > > > >> > > > sent JoinGroup request contains the same subscription > > info > > > > as > > > > > we > > > > > > > >> used > > > > > > > >> > to > > > > > > > >> > > > assign the partitions previously (which should be > likely > > > the > > > > > > case > > > > > > > in > > > > > > > >> > > > practice). In addition, we should let static members > to > > > not > > > > > send > > > > > > > the > > > > > > > >> > > > LeaveGroup request when it is gracefully shutdown, so > > > that a > > > > > > > static > > > > > > > >> > > member > > > > > > > >> > > > can only be leaving the group if its session has timed > > > out, > > > > OR > > > > > > it > > > > > > > >> has > > > > > > > >> > > been > > > > > > > >> > > > indicated to not exist in the group any more (details > > > > below). > > > > > > > >> > > > > > > > > > > >> > > > 1.b) We have a parallel discussion about Incremental > > > > > Cooperative > > > > > > > >> > > > Rebalancing, in which we will encode the "when to > > > rebalance" > > > > > > logic > > > > > > > >> at > > > > > > > >> > the > > > > > > > >> > > > application level, instead of at the protocol level. > By > > > > doing > > > > > > this > > > > > > > >> we > > > > > > > >> > can > > > > > > > >> > > > also enable a few other optimizations, e.g. at the > > Streams > > > > > level > > > > > > > to > > > > > > > >> > first > > > > > > > >> > > > build up the state store as standby tasks and then > > > trigger a > > > > > > > second > > > > > > > >> > > > rebalance to actually migrate the active tasks while > > > keeping > > > > > the > > > > > > > >> actual > > > > > > > >> > > > rebalance latency and hence unavailability window to > be > > > > small > > > > > ( > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=5T%2FocdSTFR0kDWba3yGPs0tZNRJPCSnKBWrkiywFlwI%3D&reserved=0 > > > > > > > >> > > ). > > > > > > > >> > > > I'd propose we align > > > > > > > >> > > > KIP-345 along with this idea, and hence do not add the > > > > > > > >> > expansion_timeout > > > > > > > >> > > as > > > > > > > >> > > > part of the protocol layer, but only do that at the > > > > > > application's > > > > > > > >> > > > coordinator / assignor layer (Connect, Streams, etc). > We > > > can > > > > > > > still, > > > > > > > >> > > > deprecate the "*group.initial.rebalance.delay.ms > > > > > > > >> > > > < > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://nam04.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&data=02%7C01%7C%7C99d1b807ce7e4fd280ac08d65718dcda%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636792161627905154&sdata=HBnPB8ua6UcXzf58FZIdUn%2Flsy%2BhTgjF80OjtNjFIl0%3D&reserved=0 > > > > > > > >> > > >*" > > > > > > > >> > > > though as part of this KIP > > > > > > > >> > > > since we have discussed about its limit and think it > is > > > > > actually > > > > > > > >> not a > > > > > > > >> > > very > > > > > > > >> > > > good design and could be replaced with client-side > logic > > > > > above. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > 2. I'd like to see your thoughts on the upgrade path > for > > > > this > > > > > > KIP. > > > > > > > >> More > > > > > > > >> > > > specifically, let's say after we have upgraded broker > > > > version > > > > > to > > > > > > > be > > > > > > > >> > able > > > > > > > >> > > to > > > > > > > >> > > > recognize the new versions of JoinGroup request and > the > > > > admin > > > > > > > >> requests, > > > > > > > >> > > how > > > > > > > >> > > > should we upgrade the clients and enable static > groups? > > On > > > > top > > > > > > of > > > > > > > my > > > > > > > >> > head > > > > > > > >> > > > if we do a rolling bounce in which we set the > > member.name > > > > > > config > > > > > > > as > > > > > > > >> > well > > > > > > > >> > > > as > > > > > > > >> > > > optionally increase the session.timeout config when we > > > > bounce > > > > > > each > > > > > > > >> > > > instance, then during this rolling bounces we will > have > > a > > > > > group > > > > > > > >> > contained > > > > > > > >> > > > with both dynamic members and static members. It means > > > that > > > > we > > > > > > > >> should > > > > > > > >> > > have > > > > > > > >> > > > the group to allow such scenario (i.e. we cannot > reject > > > > > > JoinGroup > > > > > > > >> > > requests > > > > > > > >> > > > from dynamic members), and hence the "member.name" > -> " > > > > > > member.id" > > > > > > > >> > > mapping > > > > > > > >> > > > will only be partial at this scenario. Also could you > > > > describe > > > > > > if > > > > > > > >> the > > > > > > > >> > > > upgrade to the first version that support this feature > > > would > > > > > > ever > > > > > > > >> get > > > > > > > >> > any > > > > > > > >> > > > benefits, or only the future upgrade path for rolling > > > > bounces > > > > > > > could > > > > > > > >> get > > > > > > > >> > > > benefits out of this feature? > > > > > > > >> > > > > > > > > > > >> > > > If that's the case and we will do 1) as suggested > above, > > > do > > > > we > > > > > > > still > > > > > > > >> > need > > > > > > > >> > > > the enableStaticMembership and enableDynamicMembership > > > admin > > > > > > > >> requests > > > > > > > >> > any > > > > > > > >> > > > more? Seems it is not necessary any more as we will > only > > > > have > > > > > > the > > > > > > > >> > notion > > > > > > > >> > > of > > > > > > > >> > > > "dynamic or static members" that can co-exist in a > group > > > > while > > > > > > > >> there no > > > > > > > >> > > > notion of "dynamic or static groups", and hence these > > two > > > > > > requests > > > > > > > >> are > > > > > > > >> > > not > > > > > > > >> > > > needed anymore. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > 3. We need to briefly talk about the implications for > > ACL > > > as > > > > > we > > > > > > > >> > introduce > > > > > > > >> > > > new admin requests that are related to a specific > > > group.id. > > > > > For > > > > > > > >> > example, > > > > > > > >> > > > we > > > > > > > >> > > > need to make sure that whoever created the group or > > joined > > > > the > > > > > > > group > > > > > > > >> > can > > > > > > > >> > > > actually send admin requests for the group, otherwise > > the > > > > > > > >> application > > > > > > > >> > > > owners need to bother the Kafka operators on a > > > multi-tenant > > > > > > > cluster > > > > > > > >> > every > > > > > > > >> > > > time they want to send any admin requests for their > > groups > > > > > which > > > > > > > >> would > > > > > > > >> > be > > > > > > > >> > > > an operational nightmare. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > 4. I like Jason's suggestion of adding an optional > field > > > for > > > > > the > > > > > > > >> list > > > > > > > >> > of > > > > > > > >> > > > member names, and I'm wondering if that can be done as > > > part > > > > of > > > > > > the > > > > > > > >> > > > forceStaticRebalance request: i.e. by passing a list > of > > > > > members, > > > > > > > we > > > > > > > >> > will > > > > > > > >> > > > enforce a rebalance immediately since it indicates > that > > > some > > > > > > > static > > > > > > > >> > > member > > > > > > > >> > > > will be officially kicked out of the group and some > new > > > > static > > > > > > > >> members > > > > > > > >> > > may > > > > > > > >> > > > be added. So back to 1.a) above, a static member can > > only > > > be > > > > > > > kicked > > > > > > > >> out > > > > > > > >> > > of > > > > > > > >> > > > the group if a) its session (arguably long period of > > time) > > > > has > > > > > > > timed > > > > > > > >> > out, > > > > > > > >> > > > and b) this admin request explicitly state that it is > no > > > > > longer > > > > > > > >> part of > > > > > > > >> > > the > > > > > > > >> > > > group. As for execution I'm fine with keeping it as a > > > future > > > > > > work > > > > > > > of > > > > > > > >> > this > > > > > > > >> > > > KIP if you'd like to make its scope smaller. > > > > > > > >> > > > > > > > > > > >> > > > Following are minor comments: > > > > > > > >> > > > > > > > > > > >> > > > 5. I'm not sure if we need to include "member.name" > as > > > part > > > > > of > > > > > > > the > > > > > > > >> > > > OffsetCommitRequest for fencing purposes, as I think > the > > > > > > memberId > > > > > > > >> plus > > > > > > > >> > > the > > > > > > > >> > > > generation number should be sufficient for fencing > even > > > with > > > > > > > static > > > > > > > >> > > > members. > > > > > > > >> > > > > > > > > > > >> > > > 6. As mentioned above, if we agree to do 1) we can get > > rid > > > > of > > > > > > the > > > > > > > " > > > > > > > >> > > > LEAVE_GROUP_ON_CLOSE_CONFIG" config. > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > Guozhang > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin < > > > > lindon...@gmail.com > > > > > > > > > > > > > >> wrote: > > > > > > > >> > > > > > > > > > > >> > > > > Hey Boyang, > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks for the proposal! This is very useful. I have > > > some > > > > > > > comments > > > > > > > >> > > below: > > > > > > > >> > > > > > > > > > > > >> > > > > 1) The motivation currently explicitly states that > the > > > > goal > > > > > is > > > > > > > to > > > > > > > >> > > improve > > > > > > > >> > > > > performance for heavy state application. It seems > that > > > the > > > > > > > >> motivation > > > > > > > >> > > can > > > > > > > >> > > > > be stronger with the following use-case. Currently > for > > > > > > > MirrorMaker > > > > > > > >> > > > cluster > > > > > > > >> > > > > with e.g. 100 MirrorMaker processes, it will take a > > long > > > > > time > > > > > > to > > > > > > > >> > > rolling > > > > > > > >> > > > > bounce the entire MirrorMaker cluster. Each > > MirrorMaker > > > > > > process > > > > > > > >> > restart > > > > > > > >> > > > > will trigger a rebalance which currently pause the > > > > > consumption > > > > > > > of > > > > > > > >> the > > > > > > > >> > > all > > > > > > > >> > > > > partitions of the MirrorMaker cluster. With the > change > > > > > stated > > > > > > in > > > > > > > >> this > > > > > > > >> > > > > patch, as long as a MirrorMaker can restart within > the > > > > > > specified > > > > > > > >> > > timeout > > > > > > > >> > > > > (e.g. 2 minutes), then we only need constant number > of > > > > > > rebalance > > > > > > > >> > (e.g. > > > > > > > >> > > > for > > > > > > > >> > > > > leader restart) for the entire rolling bounce, which > > > will > > > > > > > >> > significantly > > > > > > > >> > > > > improves the availability of the MirrorMaker > pipeline. > > > In > > > > my > > > > > > > >> opinion, > > > > > > > >> > > the > > > > > > > >> > > > > main benefit of the KIP is to avoid unnecessary > > > rebalance > > > > if > > > > > > the > > > > > > > >> > > consumer > > > > > > > >> > > > > process can be restarted within soon, which helps > > > > > performance > > > > > > > >> even if > > > > > > > >> > > > > overhead of state shuffling for a given process is > > > small. > > > > > > > >> > > > > > > > > > > > >> > > > > 2) In order to simplify the KIP reading, can you > > follow > > > > the > > > > > > > >> writeup > > > > > > > >> > > style > > > > > > > >> > > > > of other KIP (e.g. KIP-98) and list the interface > > change > > > > > such > > > > > > as > > > > > > > >> new > > > > > > > >> > > > > configs (e.g. registration timeout), new > > > request/response, > > > > > new > > > > > > > >> > > > AdminClient > > > > > > > >> > > > > API and new error code (e.g. > DUPLICATE_STATIC_MEMBER)? > > > > > > Currently > > > > > > > >> some > > > > > > > >> > > of > > > > > > > >> > > > > these are specified in the Proposed Change section > > which > > > > > makes > > > > > > > it > > > > > > > >> a > > > > > > > >> > bit > > > > > > > >> > > > > inconvenient to understand the new interface that > will > > > be > > > > > > > exposed > > > > > > > >> to > > > > > > > >> > > > user. > > > > > > > >> > > > > Explanation of the current two-phase rebalance > > protocol > > > > > > probably > > > > > > > >> can > > > > > > > >> > be > > > > > > > >> > > > > moved out of public interface section. > > > > > > > >> > > > > > > > > > > > >> > > > > 3) There are currently two version of > JoinGroupRequest > > > in > > > > > the > > > > > > > KIP > > > > > > > >> and > > > > > > > >> > > > only > > > > > > > >> > > > > one of them has field memberId. This seems > confusing. > > > > > > > >> > > > > > > > > > > > >> > > > > 4) It is mentioned in the KIP that "An admin API to > > > force > > > > > > > >> rebalance > > > > > > > >> > > could > > > > > > > >> > > > > be helpful here, but we will make a call once we > > > finished > > > > > the > > > > > > > >> major > > > > > > > >> > > > > implementation". So this seems to be still an open > > > > question > > > > > in > > > > > > > the > > > > > > > >> > > > current > > > > > > > >> > > > > design. We probably want to agree on this before > > voting > > > > for > > > > > > the > > > > > > > >> KIP. > > > > > > > >> > > > > > > > > > > > >> > > > > 5) The KIP currently adds new config MEMBER_NAME for > > > > > consumer. > > > > > > > Can > > > > > > > >> > you > > > > > > > >> > > > > specify the name of the config key and the default > > > config > > > > > > value? > > > > > > > >> > > Possible > > > > > > > >> > > > > default values include empty string or null (similar > > to > > > > > > > >> > transaction.id > > > > > > > >> > > > in > > > > > > > >> > > > > producer config). > > > > > > > >> > > > > > > > > > > > >> > > > > 6) Regarding the use of the topic > "static_member_map" > > to > > > > > > persist > > > > > > > >> > member > > > > > > > >> > > > > name map, currently if consumer coordinator broker > > goes > > > > > > offline, > > > > > > > >> > > > rebalance > > > > > > > >> > > > > is triggered and consumers will try connect to the > new > > > > > > > >> coordinator. > > > > > > > >> > If > > > > > > > >> > > > > these consumers can connect to the new coordinator > > > within > > > > > > > >> > > > > max.poll.interval.ms which by default is 5 minutes, > > > given > > > > > > that > > > > > > > >> > broker > > > > > > > >> > > > can > > > > > > > >> > > > > use a deterministic algorithm to determine the > > partition > > > > -> > > > > > > > >> > member_name > > > > > > > >> > > > > mapping, each consumer should get assigned the same > > set > > > of > > > > > > > >> partitions > > > > > > > >> > > > > without requiring state shuffling. So it is not > clear > > > > > whether > > > > > > we > > > > > > > >> > have a > > > > > > > >> > > > > strong use-case for this new logic. Can you help > > clarify > > > > > what > > > > > > is > > > > > > > >> the > > > > > > > >> > > > > benefit of using topic "static_member_map" to > persist > > > > member > > > > > > > name > > > > > > > >> > map? > > > > > > > >> > > > > > > > > > > > >> > > > > 7) Regarding the introduction of the > > expensionTimeoutMs > > > > > > config, > > > > > > > >> it is > > > > > > > >> > > > > mentioned that "we are using expansion timeout to > > > replace > > > > > > > >> rebalance > > > > > > > >> > > > > timeout, which is configured by max.poll.intervals > > from > > > > > client > > > > > > > >> side, > > > > > > > >> > > and > > > > > > > >> > > > > using registration timeout to replace session > > timeout". > > > > > > > Currently > > > > > > > >> the > > > > > > > >> > > > > default max.poll.interval.ms is configured to be 5 > > > > minutes > > > > > > and > > > > > > > >> there > > > > > > > >> > > > will > > > > > > > >> > > > > be only one rebalance if all new consumers can join > > > > within 5 > > > > > > > >> minutes. > > > > > > > >> > > So > > > > > > > >> > > > it > > > > > > > >> > > > > is not clear whether we have a strong use-case for > > this > > > > new > > > > > > > >> config. > > > > > > > >> > Can > > > > > > > >> > > > you > > > > > > > >> > > > > explain what is the benefit of introducing this new > > > > config? > > > > > > > >> > > > > > > > > > > > >> > > > > 8) It is mentioned that "To distinguish between > > previous > > > > > > version > > > > > > > >> of > > > > > > > >> > > > > protocol, we will also increase the join group > request > > > > > version > > > > > > > to > > > > > > > >> v4 > > > > > > > >> > > when > > > > > > > >> > > > > MEMBER_NAME is set" and "If the broker version is > not > > > the > > > > > > latest > > > > > > > >> (< > > > > > > > >> > > v4), > > > > > > > >> > > > > the join group request shall be downgraded to v3 > > without > > > > > > setting > > > > > > > >> the > > > > > > > >> > > > member > > > > > > > >> > > > > Id". It is probably simpler to just say that this > > > feature > > > > is > > > > > > > >> enabled > > > > > > > >> > if > > > > > > > >> > > > > JoinGroupRequest V4 is supported on both client and > > > broker > > > > > and > > > > > > > >> > > > MEMBER_NAME > > > > > > > >> > > > > is configured with non-empty string. > > > > > > > >> > > > > > > > > > > > >> > > > > 9) It is mentioned that broker may return > > > > > > > >> NO_STATIC_MEMBER_INFO_SET > > > > > > > >> > > error > > > > > > > >> > > > > in OffsetCommitResponse for "commit requests under > > > static > > > > > > > >> > membership". > > > > > > > >> > > > Can > > > > > > > >> > > > > you clarify how broker determines whether the commit > > > > request > > > > > > is > > > > > > > >> under > > > > > > > >> > > > > static membership? > > > > > > > >> > > > > > > > > > > > >> > > > > Thanks, > > > > > > > >> > > > > Dong > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > -- > > > > > > > >> > > > -- Guozhang > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > -- > > > > > > > >> > -Regards, > > > > > > > >> > Mayuresh R. Gharat > > > > > > > >> > (862) 250-7125 > > > > > > > >> > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -Regards, > > > > > > > > Mayuresh R. Gharat > > > > > > > > (862) 250-7125 > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -Regards, > > > > > > > Mayuresh R. Gharat > > > > > > > (862) 250-7125 > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -Regards, > > > > > Mayuresh R. Gharat > > > > > (862) 250-7125 > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > > -Regards, > > Mayuresh R. Gharat > > (862) 250-7125 > > > > > -- > -- Guozhang > -- -- Guozhang