Hi Boyang, Do you have a discuss thread for KIP-394 that you mentioned here ?
Thanks, Mayuresh On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen <bche...@outlook.com> wrote: > Hey Dong, thanks for the follow-up here! > > > 1) It is not very clear to the user what is the difference between > member.name and client.id as both seems to be used to identify the > consumer. I am wondering if it would be more intuitive to name it > group.member.name (preferred choice since it matches the current group.id > config name) or rebalance.member.name to explicitly show that the id is > solely used for rebalance. > Great question. I feel `member.name` is enough to explain itself, it > seems not very > helpful to make the config name longer. Comparing `name` with `id` gives > user the > impression that they have the control over it with customized rule than > library decided. > > 2) In the interface change section it is said that GroupMaxSessionTimeoutMs > will be changed to 30 minutes. It seems to suggest that we will change the > default value of this config. It does not seem necessary to increase the > time of consumer failure detection when user doesn't use static membership. > Also, say static membership is enabled, then this default config change > will cause a partition to be unavailable for consumption for 30 minutes if > there is hard consumer failure, which seems to be worse experience than > having unnecessary rebalance (when this timeout is small), particularly for > new users of Kafka. Could you explain more why we should make this change? > We are not changing the default session timeout value. We are just > changing the > cap we are enforcing on the session timeout max value. So this change is > not affecting > what kind of membership end user is using, and loosing the cap is giving > end user > more flexibility on trade-off between liveness and stability. > > 3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER > into one error? It seems that these two errors are currently handled by the > consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH > to happen. Thus it is not clear what is the benefit of having two errors. > I agree that we should remove DUPLICATE_STATIC_MEMBER error because with > the KIP-394< > https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member+id+for+initial+join+group+request > > > we will automatically fence all join requests with UNKNOWN_MEMBER_ID. > > 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains > member name which is already in the consumer group, however the member id > was missing". After a consumer is restarted, it will send a > JoinGroupRequest with an existing memberName (as the coordinator has not > expired this member from the memory) and memberId > = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted > across consumer restart in the consumer side). Does it mean that > JoinGroupRequest from a newly restarted consumer will always be rejected > until the sessionTimeoutMs has passed? > Same answer as question 3). This part of the logic shall be removed from > the proposal. > > 5) It seems that we always add two methods to the interface > org.apache.kafka.clients.admin.AdminClient.java, one with options and the > other without option. Could this be specified in the interface change > section? > Sounds good! Added both methods. > > 6) Do we plan to have off-the-shelf command line tool for SRE to trigger > rebalance? If so, we probably want to specify the command line tool > interface similar to > > https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3D&reserved=0 > . > Added the script. > > 7) Would it be simpler to replace name "forceStaticRebalance" with > "invokeConsumerRebalance"? It is not very clear what is the extra meaning > of world "force" as compared to "trigger" or "invoke". And it seems simpler > to allows this API to trigger rebalance regardless of whether consumer is > configured with memberName. > Sounds good. Right now I feel for both static and dynamic membership it is > more manageable to introduce the consumer rebalance method through admin > client API. > > 8) It is not very clear how the newly added AdminClient API trigger > rebalance. For example, does it send request? Can this be explained in the > KIP? > > Sure, I will add more details to the API. > > > Thanks again for the helpful suggestions! > > > Best, > Boyang > > ________________________________ > From: Dong Lin <lindon...@gmail.com> > Sent: Saturday, November 24, 2018 2:54 PM > To: dev > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > specifying member id > > Hey Boyang, > > Thanks for the update! Here are some followup comments: > > 1) It is not very clear to the user what is the difference between > member.name and client.id as both seems to be used to identify the > consumer. I am wondering if it would be more intuitive to name it > group.member.name (preferred choice since it matches the current group.id > config name) or rebalance.member.name to explicitly show that the id is > solely used for rebalance. > > 2) In the interface change section it is said that GroupMaxSessionTimeoutMs > will be changed to 30 minutes. It seems to suggest that we will change the > default value of this config. It does not seem necessary to increase the > time of consumer failure detection when user doesn't use static membership. > Also, say static membership is enabled, then this default config change > will cause a partition to be unavailable for consumption for 30 minutes if > there is hard consumer failure, which seems to be worse experience than > having unnecessary rebalance (when this timeout is small), particularly for > new users of Kafka. Could you explain more why we should make this change? > > 3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER > into one error? It seems that these two errors are currently handled by the > consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH > to happen. Thus it is not clear what is the benefit of having two errors. > > 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains > member name which is already in the consumer group, however the member id > was missing". After a consumer is restarted, it will send a > JoinGroupRequest with an existing memberName (as the coordinator has not > expired this member from the memory) and memberId > = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted > across consumer restart in the consumer side). Does it mean that > JoinGroupRequest from a newly restarted consumer will always be rejected > until the sessionTimeoutMs has passed? > > 5) It seems that we always add two methods to the interface > org.apache.kafka.clients.admin.AdminClient.java, one with options and the > other without option. Could this be specified in the interface change > section? > > 6) Do we plan to have off-the-shelf command line tool for SRE to trigger > rebalance? If so, we probably want to specify the command line tool > interface similar to > > https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3D&reserved=0 > . > > 7) Would it be simpler to replace name "forceStaticRebalance" with > "invokeConsumerRebalance"? It is not very clear what is the extra meaning > of world "force" as compared to "trigger" or "invoke". And it seems simpler > to allows this API to trigger rebalance regardless of whether consumer is > configured with memberName. > > 8) It is not very clear how the newly added AdminClient API trigger > rebalance. For example, does it send request? Can this be explained in the > KIP? > > Thanks, > Dong > > > On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen <bche...@outlook.com> wrote: > > > Hey Mayuresh, > > > > > > thanks for your feedbacks! I will try do another checklist here. > > > > > > > By this you mean, even if the application has not called > > > KafkaConsumer.poll() within session timeout, it will not be sending the > > > LeaveGroup request, right? > > > > Yep it's true, we will prevent client from sending leave group request > > when they are set with `member.name`. > > > > > > > When is the member.name removed from this map? > > Good question, we will only kick off member due to session timeout within > > static membership. Let me update the KIP to clearly assert that. > > > > > How is this case (missing member id) handled on the client side? What > is > > the application that > > > is using the KafkaConsumer suppose to do in this scenario? > > I have extended the two exceptions within join group response V4. > > Basically I define both corresponding actions to be immediate failing > > client application, because so far it is unknown what kind of client > issue > > could trigger them. After the first version, we will keep enhance the > error > > handling logic! > > > > > This would mean that it might take more time to detect unowned topic > > > partitions and may cause delay for applications that perform data > > mirroring > > > tasks. I discussed this with our sre and we have a suggestion to make > > here > > > as listed below separately. > > The goal of extending session timeout cap is for users with good client > > side monitoring tools that could auto-heal the dead consumers very fast. > So > > it is optional (and personal) to extend session timeout to a reasonable > > number with different client scenarios. > > > > > you meant remove unjoined members of the group, right ? > > Yep, there is a typo. Thanks for catching this! > > > > > What do you mean by " Internally we would optimize this logic by having > > > rebalance timeout only in charge of stopping prepare rebalance stage, > > > without removing non-responsive members immediately." There would not > be > > a > > > full rebalance if the lagging consumer sent a JoinGroup request later, > > > right ? If yes, can you highlight this in the KIP ? > > No, there won't be. We want to limit the rebalance timeout functionality > > to only use as a timer to > > end prepare rebalance stage. This way, late joining static members will > > not trigger further rebalance > > as long as they are within session timeout. I added your highlight to the > > KIP! > > > > > The KIP talks about scale up scenario but its not quite clear how we > > > handle it. Are we adding a separate "expansion.timeout" or we adding > > status > > > "learner" ?. Can you shed more light on how this is handled in the KIP, > > if > > > its handled? > > Updated the KIP: we shall not cover scale up case in 345, because we > > believe client side could > > better handle this logic. > > > > > I think Jason had brought this up earlier about having a way to say how > > > many members/consumer hosts are you choosing to be in the consumer > group. > > > If we can do this, then in case of mirroring applications we can do > this > > : > > > Lets say we have a mirroring application that consumes from Kafka > cluster > > > A and produces to Kafka cluster B. > > > Depending on the data and the Kafka cluster configuration, Kafka > service > > > providers can set a mirroring group saying that it will take, for > example > > > 300 consumer hosts/members to achieve the desired throughput and > latency > > > for mirroring and can have additional 10 consumer hosts as spare in the > > > same group. > > > So when the first 300 members/consumers to join the group will start > > > mirroring the data from Kafka cluster A to Kafka cluster B. > > > The remaining 10 consumer members can sit idle. > > > The moment one of the consumer (for example: consumer number 54) from > the > > > first 300 members go out of the group (crossed session timeout), it > (the > > > groupCoordinator) can just assign the topicPartitions from the consumer > > > member 54 to one of the spare hosts. > > > Once the consumer member 54 comes back up, it can start as being a part > > of > > > the spare pool. > > > This enables us to have lower session timeouts and low latency > mirroring, > > > in cases where the service providers are OK with having spare hosts. > > > This would mean that we would tolerate n consumer members leaving and > > > rejoining the group and still provide low latency as long as n <= > number > > of > > > spare consumers. > > > If there are no spare host available, we can get back to the idea as > > > described in the KIP. > > Great idea! In fact on top of static membership we could later introduce > > APIs to set hard-coded > > client ids to the group and replace the dead host, or as you proposed to > > define spare host as > > what I understood as hot backup. I will put both Jason and your > > suggestions into a separate section > > called "Future works". Note that this spare host idea may be also > solvable > > through rebalance protocol > > IMO. > > > > Thank you again for the great feedback! > > > > Boyang > > ________________________________ > > From: Boyang Chen <bche...@outlook.com> > > Sent: Thursday, November 22, 2018 3:39 PM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > > specifying member id > > > > Hey Dong, sorry for missing your message. I couldn't find your email on > my > > thread, so I will just do a checklist here! > > > > > > 1) The motivation currently explicitly states that the goal is to improve > > > > performance for heavy state application. It seems that the motivation can > > > > be stronger with the following use-case. Currently for MirrorMaker > cluster > > > > with e.g. 100 MirrorMaker processes, it will take a long time to rolling > > > > bounce the entire MirrorMaker cluster. Each MirrorMaker process restart > > > > will trigger a rebalance which currently pause the consumption of the all > > > > partitions of the MirrorMaker cluster. With the change stated in this > > > > patch, as long as a MirrorMaker can restart within the specified timeout > > > > (e.g. 2 minutes), then we only need constant number of rebalance (e.g. > for > > > > leader restart) for the entire rolling bounce, which will significantly > > > > improves the availability of the MirrorMaker pipeline. In my opinion, the > > > > main benefit of the KIP is to avoid unnecessary rebalance if the consumer > > > > process can be restarted within soon, which helps performance even if > > > > overhead of state shuffling for a given process is small. > > > > I just rephrased this part and added it to the KIP. Thanks for making the > > motivation more solid! > > > > 2) In order to simplify the KIP reading, can you follow the writeup style > > of other KIP (e.g. KIP-98) and list the interface change such as new > > configs (e.g. registration timeout), new request/response, new > AdminClient > > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently some of > > these are specified in the Proposed Change section which makes it a bit > > inconvenient to understand the new interface that will be exposed to > user. > > Explanation of the current two-phase rebalance protocol probably can be > > moved out of public interface section. > > This is a great suggestion! I just consolidated all the public API > > changes, and the whole KIP > > looks much more organized! > > > > 3) There are currently two version of JoinGroupRequest in the KIP and > only > > one of them has field memberId. This seems confusing. > > Yep, I already found this issue and fixed it. > > > > 4) It is mentioned in the KIP that "An admin API to force rebalance could > > be helpful here, but we will make a call once we finished the major > > implementation". So this seems to be still an open question in the > current > > design. We probably want to agree on this before voting for the KIP. > > We have finalized the idea that this API is needed. > > > > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can you > > specify the name of the config key and the default config value? Possible > > default values include empty string or null (similar to transaction.id< > > > https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=kkCtxHpsAZpbJZpUc52dtv6ac8UJOx6CQlts3CPjDh8%3D&reserved=0 > > > > in > > producer config). > > I have defined the `member.name` in "New configuration" section. > > > > 6) Regarding the use of the topic "static_member_map" to persist member > > name map, currently if consumer coordinator broker goes offline, > rebalance > > is triggered and consumers will try connect to the new coordinator. If > > these consumers can connect to the new coordinator within > > max.poll.interval.ms< > > > https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=By6s977ocGSm%2FF5dSvUVtyPM%2B2OUt0XzFMWRHWaoVVk%3D&reserved=0 > > > > which by default is 5 minutes, given that broker can > > use a deterministic algorithm to determine the partition -> member_name > > mapping, each consumer should get assigned the same set of partitions > > without requiring state shuffling. So it is not clear whether we have a > > strong use-case for this new logic. Can you help clarify what is the > > benefit of using topic "static_member_map" to persist member name map? > > I have discussed with Guozhang offline, and I believe reusing the current > > `_consumer_offsets` > > topic is a better and unified solution. > > > > 7) Regarding the introduction of the expensionTimeoutMs config, it is > > mentioned that "we are using expansion timeout to replace rebalance > > timeout, which is configured by max.poll.intervals from client side, and > > using registration timeout to replace session timeout". Currently the > > default max.poll.interval.ms< > > > https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=By6s977ocGSm%2FF5dSvUVtyPM%2B2OUt0XzFMWRHWaoVVk%3D&reserved=0 > > > > is configured to be 5 minutes and there will > > be only one rebalance if all new consumers can join within 5 minutes. So > it > > is not clear whether we have a strong use-case for this new config. Can > you > > explain what is the benefit of introducing this new config? > > Previously our goal is to use expansion timeout as a workaround for > > triggering multiple > > rebalances when scaling up members are not joining at the same time. It > is > > decided to > > be addressed by client side protocol change, so we will not introduce > > expansion timeout. > > > > 8) It is mentioned that "To distinguish between previous version of > > protocol, we will also increase the join group request version to v4 when > > MEMBER_NAME is set" and "If the broker version is not the latest (< v4), > > the join group request shall be downgraded to v3 without setting the > member > > Id". It is probably simpler to just say that this feature is enabled if > > JoinGroupRequest V4 is supported on both client and broker and > MEMBER_NAME > > is configured with non-empty string. > > Yep, addressed this! > > > > 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET error > > in OffsetCommitResponse for "commit requests under static membership". > Can > > you clarify how broker determines whether the commit request is under > > static membership? > > > > We have agreed that commit request shouldn't be affected by the new > > membership, thus > > removing it here. Thanks for catching this! > > > > Let me know if you have further suggestions or concerns. Thank you for > > your valuable feedback > > to help me design the KIP better! (And I will try to address your > > feedbacks in next round Mayuresh ??) > > > > Best, > > Boyang > > ________________________________ > > From: Mayuresh Gharat <gharatmayures...@gmail.com> > > Sent: Wednesday, November 21, 2018 7:50 AM > > To: dev@kafka.apache.org > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by > > specifying member id > > > > Hi Boyang, > > > > Thanks for updating the KIP. This is a step good direction for stateful > > applications and also mirroring applications whose latency is affected > due > > to the rebalance issues that we have today. > > > > I had a few questions on the current version of the KIP : > > For the effectiveness of the KIP, consumer with member.name set will > *not > > send leave group request* when they go offline > > > > > By this you mean, even if the application has not called > > > KafkaConsumer.poll() within session timeout, it will not be sending the > > > LeaveGroup request, right? > > > > > > > Broker will maintain an in-memory mapping of {member.name ? member.id} > to > > track member uniqueness. > > > > > When is the member.name removed from this map? > > > > > > > Member.id must be set if the *member.name < > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=aSTzJlF4GPFGKhi5K7MPZozMn67718GWpqEYxFay%2BZs%3D&reserved=0 > > > > *is already > > within the map. Otherwise reply MISSING_MEMBER_ID > > > > > How is this case handled on the client side? What is the application > that > > > is using the KafkaConsumer suppose to do in this scenario? > > > > > > > Session timeout is the timeout we will trigger rebalance when a member > goes > > offline for too long (not sending heartbeat request). To make static > > membership effective, we should increase the default max session timeout > to > > 30 min so that end user could config it freely. > > > > > This would mean that it might take more time to detect unowned topic > > > partitions and may cause delay for applications that perform data > > mirroring > > > tasks. I discussed this with our sre and we have a suggestion to make > > here > > > as listed below separately. > > > > > > > Currently there is a config called *rebalance timeout* which is > configured > > by consumer *max.poll.intervals*. The reason we set it to poll interval > is > > because consumer could only send request within the call of poll() and we > > want to wait sufficient time for the join group request. When reaching > > rebalance timeout, the group will move towards completingRebalance stage > > and remove unjoined groups > > > > > you meant remove unjoined members of the group, right ? > > > > > > > Currently there is a config called *rebalance timeout* which is > configured > > by consumer *max.poll.intervals*. The reason we set it to poll interval > is > > because consumer could only send request within the call of poll() and we > > want to wait sufficient time for the join group request. When reaching > > rebalance timeout, the group will move towards completingRebalance stage > > and remove unjoined groups. This is actually conflicting with the design > of > > static membership, because those temporarily unavailable members will > > potentially reattempt the join group and trigger extra rebalances. > > Internally we would optimize this logic by having rebalance timeout only > in > > charge of stopping prepare rebalance stage, without removing > non-responsive > > members immediately. > > > > > What do you mean by " Internally we would optimize this logic by having > > > rebalance timeout only in charge of stopping prepare rebalance stage, > > > without removing non-responsive members immediately." There would not > be > > a > > > full rebalance if the lagging consumer sent a JoinGroup request later, > > > right ? If yes, can you highlight this in the KIP ? > > > > > > > Scale Up > > > > > The KIP talks about scale up scenario but its not quite clear how we > > > handle it. Are we adding a separate "expansion.timeout" or we adding > > status > > > "learner" ?. Can you shed more light on how this is handled in the KIP, > > if > > > its handled? > > > > > > > > > *Discussion* > > Larger session timeouts causing latency rise for getting data for > un-owned > > topic partitions : > > > > > I think Jason had brought this up earlier about having a way to say how > > > many members/consumer hosts are you choosing to be in the consumer > group. > > > If we can do this, then in case of mirroring applications we can do > this > > : > > > Lets say we have a mirroring application that consumes from Kafka > cluster > > > A and produces to Kafka cluster B. > > > Depending on the data and the Kafka cluster configuration, Kafka > service > > > providers can set a mirroring group saying that it will take, for > example > > > 300 consumer hosts/members to achieve the desired throughput and > latency > > > for mirroring and can have additional 10 consumer hosts as spare in the > > > same group. > > > So when the first 300 members/consumers to join the group will start > > > mirroring the data from Kafka cluster A to Kafka cluster B. > > > The remaining 10 consumer members can sit idle. > > > The moment one of the consumer (for example: consumer number 54) from > the > > > first 300 members go out of the group (crossed session timeout), it > (the > > > groupCoordinator) can just assign the topicPartitions from the consumer > > > member 54 to one of the spare hosts. > > > Once the consumer member 54 comes back up, it can start as being a part > > of > > > the spare pool. > > > This enables us to have lower session timeouts and low latency > mirroring, > > > in cases where the service providers are OK with having spare hosts. > > > This would mean that we would tolerate n consumer members leaving and > > > rejoining the group and still provide low latency as long as n <= > number > > of > > > spare consumers. > > > If there are no spare host available, we can get back to the idea as > > > described in the KIP. > > > > > > > Thanks, > > > > Mayuresh > > > > > > > > > > > > On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > > > Hi Boyang. > > > > > > Thanks for preparing this KIP! It is making good progress and will be a > > > great improvement for stateful Kafka applications. > > > > > > Apologies for my late reply, I was away for a while. Lots of great > > comments > > > so far, so I'll probably second most of them in what I suggest below at > > > this point. > > > > > > When I first read the KIP, I wanted to start at the end with something > > that > > > wasn't highlighted a lot. That was the topic related to handling > > duplicate > > > members. I see now that the initial suggestion of handling this > situation > > > during offset commit has been removed, and I agree with that. Issues > > > related to membership seem to be handled better when the member joins > the > > > group rather than when it tries to commit offsets. This also simplifies > > how > > > many request types need to change in order to incorporate the new > member > > > name field. > > > > > > I also agree with what Jason and Guozhang have said regarding timeouts. > > > Although semantically, it's easier to think of every operation having > its > > > own timeout, operationally this can become a burden. Thus, > consolidation > > > seems preferable here. The definition of embedded protocols on top of > the > > > base group membership protocol for rebalancing gives enough flexibility > > to > > > address such needs in each client component separately. > > > > > > Finally, some minor comments: > > > In a few places the new/proposed changes are referred to as "current". > > > Which is a bit confusing considering that there is a protocol in place > > > already, and by "current" someone might understand the existing one. > I'd > > > recommend using new/proposed or equivalent when referring to changes > > > introduced with KIP-345 and current/existing or equivalent when > referring > > > to existing behavior. > > > > > > There's the following sentence in the "Public Interfaces" section: > > > "Since for many stateful consumer/stream applications, the state > > shuffling > > > is more painful than short time partial unavailability." > > > However, my understanding is that the changes proposed with KIP-345 > will > > > not exploit any partial availability. A suggestion for dealing with > > > temporary imbalances has been made in "Incremental Cooperative > > Rebalancing" > > > which can work well with KIP-345, but here I don't see proposed changes > > > that suggest that some resources (e.g. partitions) will keep being used > > > while others will not be utilized. Thus, you might want to adjust this > > > sentence. Correct me if I'm missing something related to that. > > > > > > In the rejected alternatives, under point 2) I read "we can copy the > > member > > > id to the config files". I believe it means to say "member name" unless > > I'm > > > missing something about reusing member ids. Also below I read: "By > > allowing > > > consumers to optionally specifying a member id" which probably implies > > > "member name" again. In a sense this section highlights a potential > > > confusion between member name and member id. I wonder if we could come > up > > > with a better term for the new field. StaticTag, StaticLabel, or even > > > StaticName are some suggestions that could potentially help with > > confusion > > > between MemberId and MemberName and what corresponds to what. But I > > > wouldn't like to disrupt the discussion with naming conventions too > much > > at > > > this point. I just mention it here as a thought. > > > > > > Looking forward to see the final details of this KIP. Great work so > far! > > > > > > Konstantine > > > > > > > > > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <bche...@outlook.com> > wrote: > > > > > > > Thanks Guozhang for the great summary here, and I have been following > > up > > > > the action items here. > > > > > > > > > > > > 1. I already updated the KIP to remove the expansion timeout and > > > > registration timeout. Great to see them being addressed in client > side! > > > > 2. I double checked the design and I believe that it is ok to have > > > both > > > > static member and dynamic member co-exist in the same group. So the > > > upgrade > > > > shouldn't be destructive and we are removing the two membership > > protocol > > > > switching APIs. > > > > 3. I only have question about this one. I'm still reading the > > > KafkaApis > > > > code here. Should I just use the same authorization logic for > > > > ForceStaticRebalanceRequest as JoinGroupRequest? > > > > 4. I'm very excited to see this work with K8! Like you suggested, > > this > > > > feature could be better addressed in a separate KIP because it is > > pretty > > > > independent. I could start drafting the KIP once the current proposal > > is > > > > approved. > > > > 5. I believe that we don't need fencing in offset commit request, > > > since > > > > duplicate member.name issue could be handled by join group request. > We > > > > shall reject join group with known member name but no member id > (which > > > > means we already have an active member using this identity). > > > > 6. I agree to remove that internal config once we move forward > with > > > > static membership. And I already removed the entire section from the > > KIP. > > > > > > > > Let me know if you have other concerns. > > > > > > > > Best, > > > > Boyang > > > > ________________________________ > > > > From: Guozhang Wang <wangg...@gmail.com> > > > > Sent: Tuesday, November 20, 2018 4:21 PM > > > > To: dev > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances > by > > > > specifying member id > > > > > > > > Hello Boyang, > > > > > > > > Thanks a lot for the KIP! It is a great write-up and I appreciate > your > > > > patience answering to the feedbacks from the community. I'd like to > add > > > my > > > > 2cents here: > > > > > > > > 1. By introducing another two timeout configs, registration_timeout > and > > > > expansion_timeout, we are effectively having four timeout configs: > > > session > > > > timeout, rebalance timeout (configured as "max.poll.interval.ms" on > > > client > > > > side), and these two. Interplaying these timeout configs can be quite > > > hard > > > > for users with such complexity, and hence I'm wondering if we can > > > simplify > > > > the situation with as less possible timeout configs as possible. Here > > is > > > a > > > > concrete suggestion I'd like propose: > > > > > > > > 1.a) Instead of introducing a registration_timeout in addition to the > > > > session_timeout for static members, we can just reuse the > > session_timeout > > > > and ask users to set it to a larger value when they are upgrading a > > > dynamic > > > > client to a static client by setting the "member.name" at the same > > time. > > > > By > > > > default, the broker-side min.session.timeout is 6 seconds and > > > > max.session.timeout is 5 minutes, which seems reasonable to me (we > can > > of > > > > course modify this broker config to enlarge the valid interval if we > > want > > > > in practice). And then we should also consider removing the condition > > for > > > > marking a client as failed if the rebalance timeout has reached while > > the > > > > JoinGroup was not received, so that the semantics of session_timeout > > and > > > > rebalance_timeout are totally separated: the former is only used to > > > > determine if a consumer member of the group should be marked as > failed > > > and > > > > kicked out of the group, and the latter is only used to determine the > > > > longest time coordinator should wait for PREPARE_REBALANCE phase. In > > > other > > > > words if a member did not send the JoinGroup in time of the > > > > rebalance_timeout, we still include it in the new generation of the > > group > > > > and use its old subscription info to send to leader for assignment. > > Later > > > > if the member came back with HeartBeat request, we can still follow > the > > > > normal path to bring it to the latest generation while checking that > > its > > > > sent JoinGroup request contains the same subscription info as we used > > to > > > > assign the partitions previously (which should be likely the case in > > > > practice). In addition, we should let static members to not send the > > > > LeaveGroup request when it is gracefully shutdown, so that a static > > > member > > > > can only be leaving the group if its session has timed out, OR it has > > > been > > > > indicated to not exist in the group any more (details below). > > > > > > > > 1.b) We have a parallel discussion about Incremental Cooperative > > > > Rebalancing, in which we will encode the "when to rebalance" logic at > > the > > > > application level, instead of at the protocol level. By doing this we > > can > > > > also enable a few other optimizations, e.g. at the Streams level to > > first > > > > build up the state store as standby tasks and then trigger a second > > > > rebalance to actually migrate the active tasks while keeping the > actual > > > > rebalance latency and hence unavailability window to be small ( > > > > > > > > > > > > > > https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=ZJlsB%2FHLhZykd9mtu5CINNNEqMBvX75bdhqR3IlxGI8%3D&reserved=0 > > > ). > > > > I'd propose we align > > > > KIP-345 along with this idea, and hence do not add the > > expansion_timeout > > > as > > > > part of the protocol layer, but only do that at the application's > > > > coordinator / assignor layer (Connect, Streams, etc). We can still, > > > > deprecate the "*group.initial.rebalance.delay.ms > > > > < > > > > > > > > > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&sdata=DDhjV41nPU3euYCQ3w8WPENuw9fPB6ah2j6rF0JjRBg%3D&reserved=0 > > > >*" > > > > though as part of this KIP > > > > since we have discussed about its limit and think it is actually not > a > > > very > > > > good design and could be replaced with client-side logic above. > > > > > > > > > > > > 2. I'd like to see your thoughts on the upgrade path for this KIP. > More > > > > specifically, let's say after we have upgraded broker version to be > > able > > > to > > > > recognize the new versions of JoinGroup request and the admin > requests, > > > how > > > > should we upgrade the clients and enable static groups? On top of my > > head > > > > if we do a rolling bounce in which we set the member.name config as > > well > > > > as > > > > optionally increase the session.timeout config when we bounce each > > > > instance, then during this rolling bounces we will have a group > > contained > > > > with both dynamic members and static members. It means that we should > > > have > > > > the group to allow such scenario (i.e. we cannot reject JoinGroup > > > requests > > > > from dynamic members), and hence the "member.name" -> "member.id" > > > mapping > > > > will only be partial at this scenario. Also could you describe if the > > > > upgrade to the first version that support this feature would ever get > > any > > > > benefits, or only the future upgrade path for rolling bounces could > get > > > > benefits out of this feature? > > > > > > > > If that's the case and we will do 1) as suggested above, do we still > > need > > > > the enableStaticMembership and enableDynamicMembership admin requests > > any > > > > more? Seems it is not necessary any more as we will only have the > > notion > > > of > > > > "dynamic or static members" that can co-exist in a group while there > no > > > > notion of "dynamic or static groups", and hence these two requests > are > > > not > > > > needed anymore. > > > > > > > > > > > > 3. We need to briefly talk about the implications for ACL as we > > introduce > > > > new admin requests that are related to a specific group.id. For > > example, > > > > we > > > > need to make sure that whoever created the group or joined the group > > can > > > > actually send admin requests for the group, otherwise the application > > > > owners need to bother the Kafka operators on a multi-tenant cluster > > every > > > > time they want to send any admin requests for their groups which > would > > be > > > > an operational nightmare. > > > > > > > > > > > > 4. I like Jason's suggestion of adding an optional field for the list > > of > > > > member names, and I'm wondering if that can be done as part of the > > > > forceStaticRebalance request: i.e. by passing a list of members, we > > will > > > > enforce a rebalance immediately since it indicates that some static > > > member > > > > will be officially kicked out of the group and some new static > members > > > may > > > > be added. So back to 1.a) above, a static member can only be kicked > out > > > of > > > > the group if a) its session (arguably long period of time) has timed > > out, > > > > and b) this admin request explicitly state that it is no longer part > of > > > the > > > > group. As for execution I'm fine with keeping it as a future work of > > this > > > > KIP if you'd like to make its scope smaller. > > > > > > > > Following are minor comments: > > > > > > > > 5. I'm not sure if we need to include "member.name" as part of the > > > > OffsetCommitRequest for fencing purposes, as I think the memberId > plus > > > the > > > > generation number should be sufficient for fencing even with static > > > > members. > > > > > > > > 6. As mentioned above, if we agree to do 1) we can get rid of the " > > > > LEAVE_GROUP_ON_CLOSE_CONFIG" config. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Boyang, > > > > > > > > > > Thanks for the proposal! This is very useful. I have some comments > > > below: > > > > > > > > > > 1) The motivation currently explicitly states that the goal is to > > > improve > > > > > performance for heavy state application. It seems that the > motivation > > > can > > > > > be stronger with the following use-case. Currently for MirrorMaker > > > > cluster > > > > > with e.g. 100 MirrorMaker processes, it will take a long time to > > > rolling > > > > > bounce the entire MirrorMaker cluster. Each MirrorMaker process > > restart > > > > > will trigger a rebalance which currently pause the consumption of > the > > > all > > > > > partitions of the MirrorMaker cluster. With the change stated in > this > > > > > patch, as long as a MirrorMaker can restart within the specified > > > timeout > > > > > (e.g. 2 minutes), then we only need constant number of rebalance > > (e.g. > > > > for > > > > > leader restart) for the entire rolling bounce, which will > > significantly > > > > > improves the availability of the MirrorMaker pipeline. In my > opinion, > > > the > > > > > main benefit of the KIP is to avoid unnecessary rebalance if the > > > consumer > > > > > process can be restarted within soon, which helps performance even > if > > > > > overhead of state shuffling for a given process is small. > > > > > > > > > > 2) In order to simplify the KIP reading, can you follow the writeup > > > style > > > > > of other KIP (e.g. KIP-98) and list the interface change such as > new > > > > > configs (e.g. registration timeout), new request/response, new > > > > AdminClient > > > > > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently > some > > > of > > > > > these are specified in the Proposed Change section which makes it a > > bit > > > > > inconvenient to understand the new interface that will be exposed > to > > > > user. > > > > > Explanation of the current two-phase rebalance protocol probably > can > > be > > > > > moved out of public interface section. > > > > > > > > > > 3) There are currently two version of JoinGroupRequest in the KIP > and > > > > only > > > > > one of them has field memberId. This seems confusing. > > > > > > > > > > 4) It is mentioned in the KIP that "An admin API to force rebalance > > > could > > > > > be helpful here, but we will make a call once we finished the major > > > > > implementation". So this seems to be still an open question in the > > > > current > > > > > design. We probably want to agree on this before voting for the > KIP. > > > > > > > > > > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can > > you > > > > > specify the name of the config key and the default config value? > > > Possible > > > > > default values include empty string or null (similar to > > transaction.id > > > > in > > > > > producer config). > > > > > > > > > > 6) Regarding the use of the topic "static_member_map" to persist > > member > > > > > name map, currently if consumer coordinator broker goes offline, > > > > rebalance > > > > > is triggered and consumers will try connect to the new coordinator. > > If > > > > > these consumers can connect to the new coordinator within > > > > > max.poll.interval.ms which by default is 5 minutes, given that > > broker > > > > can > > > > > use a deterministic algorithm to determine the partition -> > > member_name > > > > > mapping, each consumer should get assigned the same set of > partitions > > > > > without requiring state shuffling. So it is not clear whether we > > have a > > > > > strong use-case for this new logic. Can you help clarify what is > the > > > > > benefit of using topic "static_member_map" to persist member name > > map? > > > > > > > > > > 7) Regarding the introduction of the expensionTimeoutMs config, it > is > > > > > mentioned that "we are using expansion timeout to replace rebalance > > > > > timeout, which is configured by max.poll.intervals from client > side, > > > and > > > > > using registration timeout to replace session timeout". Currently > the > > > > > default max.poll.interval.ms is configured to be 5 minutes and > there > > > > will > > > > > be only one rebalance if all new consumers can join within 5 > minutes. > > > So > > > > it > > > > > is not clear whether we have a strong use-case for this new config. > > Can > > > > you > > > > > explain what is the benefit of introducing this new config? > > > > > > > > > > 8) It is mentioned that "To distinguish between previous version of > > > > > protocol, we will also increase the join group request version to > v4 > > > when > > > > > MEMBER_NAME is set" and "If the broker version is not the latest (< > > > v4), > > > > > the join group request shall be downgraded to v3 without setting > the > > > > member > > > > > Id". It is probably simpler to just say that this feature is > enabled > > if > > > > > JoinGroupRequest V4 is supported on both client and broker and > > > > MEMBER_NAME > > > > > is configured with non-empty string. > > > > > > > > > > 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET > > > error > > > > > in OffsetCommitResponse for "commit requests under static > > membership". > > > > Can > > > > > you clarify how broker determines whether the commit request is > under > > > > > static membership? > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -Regards, > > Mayuresh R. Gharat > > (862) 250-7125 > > > -- -Regards, Mayuresh R. Gharat (862) 250-7125