Hi Boyang,

Do you have a discuss thread for KIP-394 that you mentioned here ?

Thanks,

Mayuresh

On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen <bche...@outlook.com> wrote:

> Hey Dong, thanks for the follow-up here!
>
>
> 1) It is not very clear to the user what is the difference between
> member.name and client.id as both seems to be used to identify the
> consumer. I am wondering if it would be more intuitive to name it
> group.member.name (preferred choice since it matches the current group.id
> config name) or rebalance.member.name to explicitly show that the id is
> solely used for rebalance.
> Great question. I feel `member.name` is enough to explain itself, it
> seems not very
> helpful to make the config name longer. Comparing `name` with `id` gives
> user the
> impression that they have the control over it with customized rule than
> library decided.
>
> 2) In the interface change section it is said that GroupMaxSessionTimeoutMs
> will be changed to 30 minutes. It seems to suggest that we will change the
> default value of this config. It does not seem necessary to increase the
> time of consumer failure detection when user doesn't use static membership.
> Also, say static membership is enabled, then this default config change
> will cause a partition to be unavailable for consumption for 30 minutes if
> there is hard consumer failure, which seems to be worse experience than
> having unnecessary rebalance (when this timeout is small), particularly for
> new users of Kafka. Could you explain more why we should make this change?
> We are not changing the default session timeout value. We are just
> changing the
> cap we are enforcing on the session timeout max value. So this change is
> not affecting
> what kind of membership end user is using, and loosing the cap is giving
> end user
> more flexibility on trade-off between liveness and stability.
>
> 3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER
> into one error? It seems that these two errors are currently handled by the
> consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH
> to happen. Thus it is not clear what is the benefit of having two errors.
> I agree that we should remove DUPLICATE_STATIC_MEMBER error because with
> the KIP-394<
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member+id+for+initial+join+group+request
> >
> we will automatically fence all join requests with UNKNOWN_MEMBER_ID.
>
> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains
> member name which is already in the consumer group, however the member id
> was missing". After a consumer is restarted, it will send a
> JoinGroupRequest with an existing memberName (as the coordinator has not
> expired this member from the memory) and memberId
> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted
> across consumer restart in the consumer side). Does it mean that
> JoinGroupRequest from a newly restarted consumer will always be rejected
> until the sessionTimeoutMs has passed?
> Same answer as question 3). This part of the logic shall be removed from
> the proposal.
>
> 5) It seems that we always add two methods to the interface
> org.apache.kafka.clients.admin.AdminClient.java, one with options and the
> other without option. Could this be specified in the interface change
> section?
> Sounds good! Added both methods.
>
> 6) Do we plan to have off-the-shelf command line tool for SRE to trigger
> rebalance? If so, we probably want to specify the command line tool
> interface similar to
>
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3D&amp;reserved=0
> .
> Added the script.
>
> 7) Would it be simpler to replace name "forceStaticRebalance" with
> "invokeConsumerRebalance"? It is not very clear what is the extra meaning
> of world "force" as compared to "trigger" or "invoke". And it seems simpler
> to allows this API to trigger rebalance regardless of whether consumer is
> configured with memberName.
> Sounds good. Right now I feel for both static and dynamic membership it is
> more manageable to introduce the consumer rebalance method through admin
> client API.
>
> 8) It is not very clear how the newly added AdminClient API trigger
> rebalance. For example, does it send request? Can this be explained in the
> KIP?
>
> Sure, I will add more details to the API.
>
>
> Thanks again for the helpful suggestions!
>
>
> Best,
> Boyang
>
> ________________________________
> From: Dong Lin <lindon...@gmail.com>
> Sent: Saturday, November 24, 2018 2:54 PM
> To: dev
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hey Boyang,
>
> Thanks for the update! Here are some followup comments:
>
> 1) It is not very clear to the user what is the difference between
> member.name and client.id as both seems to be used to identify the
> consumer. I am wondering if it would be more intuitive to name it
> group.member.name (preferred choice since it matches the current group.id
> config name) or rebalance.member.name to explicitly show that the id is
> solely used for rebalance.
>
> 2) In the interface change section it is said that GroupMaxSessionTimeoutMs
> will be changed to 30 minutes. It seems to suggest that we will change the
> default value of this config. It does not seem necessary to increase the
> time of consumer failure detection when user doesn't use static membership.
> Also, say static membership is enabled, then this default config change
> will cause a partition to be unavailable for consumption for 30 minutes if
> there is hard consumer failure, which seems to be worse experience than
> having unnecessary rebalance (when this timeout is small), particularly for
> new users of Kafka. Could you explain more why we should make this change?
>
> 3) Could we just combine MEMBER_ID_MISMATCH and DUPLICATE_STATIC_MEMBER
> into one error? It seems that these two errors are currently handled by the
> consumer in the same way. And we don't also don't expect MEMBER_ID_MISMATCH
> to happen. Thus it is not clear what is the benefit of having two errors.
>
> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group contains
> member name which is already in the consumer group, however the member id
> was missing". After a consumer is restarted, it will send a
> JoinGroupRequest with an existing memberName (as the coordinator has not
> expired this member from the memory) and memberId
> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not persisted
> across consumer restart in the consumer side). Does it mean that
> JoinGroupRequest from a newly restarted consumer will always be rejected
> until the sessionTimeoutMs has passed?
>
> 5) It seems that we always add two methods to the interface
> org.apache.kafka.clients.admin.AdminClient.java, one with options and the
> other without option. Could this be specified in the interface change
> section?
>
> 6) Do we plan to have off-the-shelf command line tool for SRE to trigger
> rebalance? If so, we probably want to specify the command line tool
> interface similar to
>
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=%2BNasMFlJf9rEJc9iDfndcyxA4%2BGWieS1azSKbtdGRW4%3D&amp;reserved=0
> .
>
> 7) Would it be simpler to replace name "forceStaticRebalance" with
> "invokeConsumerRebalance"? It is not very clear what is the extra meaning
> of world "force" as compared to "trigger" or "invoke". And it seems simpler
> to allows this API to trigger rebalance regardless of whether consumer is
> configured with memberName.
>
> 8) It is not very clear how the newly added AdminClient API trigger
> rebalance. For example, does it send request? Can this be explained in the
> KIP?
>
> Thanks,
> Dong
>
>
> On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen <bche...@outlook.com> wrote:
>
> > Hey Mayuresh,
> >
> >
> > thanks for your feedbacks! I will try do another checklist here.
> >
> >
> > > By this you mean, even if the application has not called
> > > KafkaConsumer.poll() within session timeout, it will not be sending the
> > > LeaveGroup request, right?
> >
> > Yep it's true, we will prevent client from sending leave group request
> > when they are set with `member.name`.
> >
> >
> > > When is the member.name removed from this map?
> > Good question, we will only kick off member due to session timeout within
> > static membership. Let me update the KIP to clearly assert that.
> >
> > > How is this case (missing member id) handled on the client side? What
> is
> > the application that
> > > is using the KafkaConsumer suppose to do in this scenario?
> > I have extended the two exceptions within join group response V4.
> > Basically I define both corresponding actions to be immediate failing
> > client application, because so far it is unknown what kind of client
> issue
> > could trigger them. After the first version, we will keep enhance the
> error
> > handling logic!
> >
> > > This would mean that it might take more time to detect unowned topic
> > > partitions and may cause delay for applications that perform data
> > mirroring
> > > tasks. I discussed this with our sre and we have a suggestion to make
> > here
> > > as listed below separately.
> > The goal of extending session timeout cap is for users with good client
> > side monitoring tools that could auto-heal the dead consumers very fast.
> So
> > it is optional (and personal) to extend session timeout to a reasonable
> > number with different client scenarios.
> >
> > > you meant remove unjoined members of the group, right ?
> > Yep, there is a typo. Thanks for catching this!
> >
> > > What do you mean by " Internally we would optimize this logic by having
> > > rebalance timeout only in charge of stopping prepare rebalance stage,
> > > without removing non-responsive members immediately." There would not
> be
> > a
> > > full rebalance if the lagging consumer sent a JoinGroup request later,
> > > right ? If yes, can you highlight this in the KIP ?
> > No, there won't be. We want to limit the rebalance timeout functionality
> > to only use as a timer to
> > end prepare rebalance stage. This way, late joining static members will
> > not trigger further rebalance
> > as long as they are within session timeout. I added your highlight to the
> > KIP!
> >
> > > The KIP talks about scale up scenario but its not quite clear how we
> > > handle it. Are we adding a separate "expansion.timeout" or we adding
> > status
> > > "learner" ?. Can you shed more light on how this is handled in the KIP,
> > if
> > > its handled?
> > Updated the KIP: we shall not cover scale up case in 345, because we
> > believe client side could
> > better handle this logic.
> >
> > > I think Jason had brought this up earlier about having a way to say how
> > > many members/consumer hosts are you choosing to be in the consumer
> group.
> > > If we can do this, then in case of mirroring applications we can do
> this
> > :
> > > Lets say we have a mirroring application that consumes from Kafka
> cluster
> > > A and produces to Kafka cluster B.
> > > Depending on the data and the Kafka cluster configuration, Kafka
> service
> > > providers can set a mirroring group saying that it will take, for
> example
> > > 300 consumer hosts/members to achieve the desired throughput and
> latency
> > > for mirroring and can have additional 10 consumer hosts as spare in the
> > > same group.
> > > So when the first 300 members/consumers to join the group will start
> > > mirroring the data from Kafka cluster A to Kafka cluster B.
> > > The remaining 10 consumer members can sit idle.
> > > The moment one of the consumer (for example: consumer number 54) from
> the
> > > first 300 members go out of the group (crossed session timeout), it
> (the
> > > groupCoordinator) can just assign the topicPartitions from the consumer
> > > member 54 to one of the spare hosts.
> > > Once the consumer member 54 comes back up, it can start as being a part
> > of
> > > the spare pool.
> > > This enables us to have lower session timeouts and low latency
> mirroring,
> > > in cases where the service providers are OK with having spare hosts.
> > > This would mean that we would tolerate n consumer members leaving and
> > > rejoining the group and still provide low latency as long as n <=
> number
> > of
> > > spare consumers.
> > > If there are no spare host available, we can get back to the idea as
> > > described in the KIP.
> > Great idea! In fact on top of static membership we could later introduce
> > APIs to set hard-coded
> > client ids to the group and replace the dead host, or as you proposed to
> > define spare host as
> > what I understood as hot backup. I will put both Jason and your
> > suggestions into a separate section
> > called "Future works". Note that this spare host idea may be also
> solvable
> > through rebalance protocol
> > IMO.
> >
> > Thank you again for the great feedback!
> >
> > Boyang
> > ________________________________
> > From: Boyang Chen <bche...@outlook.com>
> > Sent: Thursday, November 22, 2018 3:39 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Hey Dong, sorry for missing your message. I couldn't find your email on
> my
> > thread, so I will just do a checklist here!
> >
> >
> > 1) The motivation currently explicitly states that the goal is to improve
> >
> > performance for heavy state application. It seems that the motivation can
> >
> > be stronger with the following use-case. Currently for MirrorMaker
> cluster
> >
> > with e.g. 100 MirrorMaker processes, it will take a long time to rolling
> >
> > bounce the entire MirrorMaker cluster. Each MirrorMaker process restart
> >
> > will trigger a rebalance which currently pause the consumption of the all
> >
> > partitions of the MirrorMaker cluster. With the change stated in this
> >
> > patch, as long as a MirrorMaker can restart within the specified timeout
> >
> > (e.g. 2 minutes), then we only need constant number of rebalance (e.g.
> for
> >
> > leader restart) for the entire rolling bounce, which will significantly
> >
> > improves the availability of the MirrorMaker pipeline. In my opinion, the
> >
> > main benefit of the KIP is to avoid unnecessary rebalance if the consumer
> >
> > process can be restarted within soon, which helps performance even if
> >
> > overhead of state shuffling for a given process is small.
> >
> > I just rephrased this part and added it to the KIP. Thanks for making the
> > motivation more solid!
> >
> > 2) In order to simplify the KIP reading, can you follow the writeup style
> > of other KIP (e.g. KIP-98) and list the interface change such as new
> > configs (e.g. registration timeout), new request/response, new
> AdminClient
> > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently some of
> > these are specified in the Proposed Change section which makes it a bit
> > inconvenient to understand the new interface that will be exposed to
> user.
> > Explanation of the current two-phase rebalance protocol probably can be
> > moved out of public interface section.
> > This is a great suggestion! I just consolidated all the public API
> > changes, and the whole KIP
> > looks much more organized!
> >
> > 3) There are currently two version of JoinGroupRequest in the KIP and
> only
> > one of them has field memberId. This seems confusing.
> > Yep, I already found this issue and fixed it.
> >
> > 4) It is mentioned in the KIP that "An admin API to force rebalance could
> > be helpful here, but we will make a call once we finished the major
> > implementation". So this seems to be still an open question in the
> current
> > design. We probably want to agree on this before voting for the KIP.
> > We have finalized the idea that this API is needed.
> >
> > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can you
> > specify the name of the config key and the default config value? Possible
> > default values include empty string or null (similar to transaction.id<
> >
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=kkCtxHpsAZpbJZpUc52dtv6ac8UJOx6CQlts3CPjDh8%3D&amp;reserved=0
> >
> > in
> > producer config).
> > I have defined the `member.name` in "New configuration" section.
> >
> > 6) Regarding the use of the topic "static_member_map" to persist member
> > name map, currently if consumer coordinator broker goes offline,
> rebalance
> > is triggered and consumers will try connect to the new coordinator. If
> > these consumers can connect to the new coordinator within
> > max.poll.interval.ms<
> >
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=By6s977ocGSm%2FF5dSvUVtyPM%2B2OUt0XzFMWRHWaoVVk%3D&amp;reserved=0
> >
> > which by default is 5 minutes, given that broker can
> > use a deterministic algorithm to determine the partition -> member_name
> > mapping, each consumer should get assigned the same set of partitions
> > without requiring state shuffling. So it is not clear whether we have a
> > strong use-case for this new logic. Can you help clarify what is the
> > benefit of using topic "static_member_map" to persist member name map?
> > I have discussed with Guozhang offline, and I believe reusing the current
> > `_consumer_offsets`
> > topic is a better and unified solution.
> >
> > 7) Regarding the introduction of the expensionTimeoutMs config, it is
> > mentioned that "we are using expansion timeout to replace rebalance
> > timeout, which is configured by max.poll.intervals from client side, and
> > using registration timeout to replace session timeout". Currently the
> > default max.poll.interval.ms<
> >
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=By6s977ocGSm%2FF5dSvUVtyPM%2B2OUt0XzFMWRHWaoVVk%3D&amp;reserved=0
> >
> > is configured to be 5 minutes and there will
> > be only one rebalance if all new consumers can join within 5 minutes. So
> it
> > is not clear whether we have a strong use-case for this new config. Can
> you
> > explain what is the benefit of introducing this new config?
> > Previously our goal is to use expansion timeout as a workaround for
> > triggering multiple
> > rebalances when scaling up members are not joining at the same time. It
> is
> > decided to
> > be addressed by client side protocol change, so we will not introduce
> > expansion timeout.
> >
> > 8) It is mentioned that "To distinguish between previous version of
> > protocol, we will also increase the join group request version to v4 when
> > MEMBER_NAME is set" and "If the broker version is not the latest (< v4),
> > the join group request shall be downgraded to v3 without setting the
> member
> > Id". It is probably simpler to just say that this feature is enabled if
> > JoinGroupRequest V4 is supported on both client and broker and
> MEMBER_NAME
> > is configured with non-empty string.
> > Yep, addressed this!
> >
> > 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET error
> > in OffsetCommitResponse for "commit requests under static membership".
> Can
> > you clarify how broker determines whether the commit request is under
> > static membership?
> >
> > We have agreed that commit request shouldn't be affected by the new
> > membership, thus
> > removing it here. Thanks for catching this!
> >
> > Let me know if you have further suggestions or concerns. Thank you for
> > your valuable feedback
> > to help me design the KIP better! (And I will try to address your
> > feedbacks in next round Mayuresh ??)
> >
> > Best,
> > Boyang
> > ________________________________
> > From: Mayuresh Gharat <gharatmayures...@gmail.com>
> > Sent: Wednesday, November 21, 2018 7:50 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Hi Boyang,
> >
> > Thanks for updating the KIP. This is a step good direction for stateful
> > applications and also mirroring applications whose latency is affected
> due
> > to the rebalance issues that we have today.
> >
> > I had a few questions on the current version of the KIP :
> > For the effectiveness of the KIP, consumer with member.name set will
> *not
> > send leave group request* when they go offline
> >
> > > By this you mean, even if the application has not called
> > > KafkaConsumer.poll() within session timeout, it will not be sending the
> > > LeaveGroup request, right?
> > >
> >
> > Broker will maintain an in-memory mapping of {member.name ? member.id}
> to
> > track member uniqueness.
> >
> > > When is the member.name removed from this map?
> > >
> >
> > Member.id must be set if the *member.name <
> >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=aSTzJlF4GPFGKhi5K7MPZozMn67718GWpqEYxFay%2BZs%3D&amp;reserved=0
> >
> > *is already
> > within the map. Otherwise reply MISSING_MEMBER_ID
> >
> > > How is this case handled on the client side? What is the application
> that
> > > is using the KafkaConsumer suppose to do in this scenario?
> > >
> >
> > Session timeout is the timeout we will trigger rebalance when a member
> goes
> > offline for too long (not sending heartbeat request). To make static
> > membership effective, we should increase the default max session timeout
> to
> > 30 min so that end user could config it freely.
> >
> > > This would mean that it might take more time to detect unowned topic
> > > partitions and may cause delay for applications that perform data
> > mirroring
> > > tasks. I discussed this with our sre and we have a suggestion to make
> > here
> > > as listed below separately.
> > >
> >
> > Currently there is a config called *rebalance timeout* which is
> configured
> > by consumer *max.poll.intervals*. The reason we set it to poll interval
> is
> > because consumer could only send request within the call of poll() and we
> > want to wait sufficient time for the join group request. When reaching
> > rebalance timeout, the group will move towards completingRebalance stage
> > and remove unjoined groups
> >
> > > you meant remove unjoined members of the group, right ?
> > >
> >
> > Currently there is a config called *rebalance timeout* which is
> configured
> > by consumer *max.poll.intervals*. The reason we set it to poll interval
> is
> > because consumer could only send request within the call of poll() and we
> > want to wait sufficient time for the join group request. When reaching
> > rebalance timeout, the group will move towards completingRebalance stage
> > and remove unjoined groups. This is actually conflicting with the design
> of
> > static membership, because those temporarily unavailable members will
> > potentially reattempt the join group and trigger extra rebalances.
> > Internally we would optimize this logic by having rebalance timeout only
> in
> > charge of stopping prepare rebalance stage, without removing
> non-responsive
> > members immediately.
> >
> > > What do you mean by " Internally we would optimize this logic by having
> > > rebalance timeout only in charge of stopping prepare rebalance stage,
> > > without removing non-responsive members immediately." There would not
> be
> > a
> > > full rebalance if the lagging consumer sent a JoinGroup request later,
> > > right ? If yes, can you highlight this in the KIP ?
> > >
> >
> > Scale Up
> >
> > > The KIP talks about scale up scenario but its not quite clear how we
> > > handle it. Are we adding a separate "expansion.timeout" or we adding
> > status
> > > "learner" ?. Can you shed more light on how this is handled in the KIP,
> > if
> > > its handled?
> > >
> >
> >
> > *Discussion*
> > Larger session timeouts causing latency rise for getting data for
> un-owned
> > topic partitions :
> >
> > > I think Jason had brought this up earlier about having a way to say how
> > > many members/consumer hosts are you choosing to be in the consumer
> group.
> > > If we can do this, then in case of mirroring applications we can do
> this
> > :
> > > Lets say we have a mirroring application that consumes from Kafka
> cluster
> > > A and produces to Kafka cluster B.
> > > Depending on the data and the Kafka cluster configuration, Kafka
> service
> > > providers can set a mirroring group saying that it will take, for
> example
> > > 300 consumer hosts/members to achieve the desired throughput and
> latency
> > > for mirroring and can have additional 10 consumer hosts as spare in the
> > > same group.
> > > So when the first 300 members/consumers to join the group will start
> > > mirroring the data from Kafka cluster A to Kafka cluster B.
> > > The remaining 10 consumer members can sit idle.
> > > The moment one of the consumer (for example: consumer number 54) from
> the
> > > first 300 members go out of the group (crossed session timeout), it
> (the
> > > groupCoordinator) can just assign the topicPartitions from the consumer
> > > member 54 to one of the spare hosts.
> > > Once the consumer member 54 comes back up, it can start as being a part
> > of
> > > the spare pool.
> > > This enables us to have lower session timeouts and low latency
> mirroring,
> > > in cases where the service providers are OK with having spare hosts.
> > > This would mean that we would tolerate n consumer members leaving and
> > > rejoining the group and still provide low latency as long as n <=
> number
> > of
> > > spare consumers.
> > > If there are no spare host available, we can get back to the idea as
> > > described in the KIP.
> > >
> >
> > Thanks,
> >
> > Mayuresh
> >
> >
> >
> >
> >
> > On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Hi Boyang.
> > >
> > > Thanks for preparing this KIP! It is making good progress and will be a
> > > great improvement for stateful Kafka applications.
> > >
> > > Apologies for my late reply, I was away for a while. Lots of great
> > comments
> > > so far, so I'll probably second most of them in what I suggest below at
> > > this point.
> > >
> > > When I first read the KIP, I wanted to start at the end with something
> > that
> > > wasn't highlighted a lot. That was the topic related to handling
> > duplicate
> > > members. I see now that the initial suggestion of handling this
> situation
> > > during offset commit has been removed, and I agree with that. Issues
> > > related to membership seem to be handled better when the member joins
> the
> > > group rather than when it tries to commit offsets. This also simplifies
> > how
> > > many request types need to change in order to incorporate the new
> member
> > > name field.
> > >
> > > I also agree with what Jason and Guozhang have said regarding timeouts.
> > > Although semantically, it's easier to think of every operation having
> its
> > > own timeout, operationally this can become a burden. Thus,
> consolidation
> > > seems preferable here. The definition of embedded protocols on top of
> the
> > > base group membership protocol for rebalancing gives enough flexibility
> > to
> > > address such needs in each client component separately.
> > >
> > > Finally, some minor comments:
> > > In a few places the new/proposed changes are referred to as "current".
> > > Which is a bit confusing considering that there is a protocol in place
> > > already, and by "current" someone might understand the existing one.
> I'd
> > > recommend using new/proposed or equivalent when referring to changes
> > > introduced with KIP-345 and current/existing or equivalent when
> referring
> > > to existing behavior.
> > >
> > > There's the following sentence in the "Public Interfaces" section:
> > > "Since for many stateful consumer/stream applications, the state
> > shuffling
> > > is more painful than short time partial unavailability."
> > > However, my understanding is that the changes proposed with KIP-345
> will
> > > not exploit any partial availability. A suggestion for dealing with
> > > temporary imbalances has been made in "Incremental Cooperative
> > Rebalancing"
> > > which can work well with KIP-345, but here I don't see proposed changes
> > > that suggest that some resources (e.g. partitions) will keep being used
> > > while others will not be utilized. Thus, you might want to adjust this
> > > sentence. Correct me if I'm missing something related to that.
> > >
> > > In the rejected alternatives, under point 2) I read "we can copy the
> > member
> > > id to the config files". I believe it means to say "member name" unless
> > I'm
> > > missing something about reusing member ids. Also below I read: "By
> > allowing
> > > consumers to optionally specifying a member id" which probably implies
> > > "member name" again. In a sense this section highlights a potential
> > > confusion between member name and member id. I wonder if we could come
> up
> > > with a better term for the new field. StaticTag, StaticLabel, or even
> > > StaticName are some suggestions that could potentially help with
> > confusion
> > > between MemberId and MemberName and what corresponds to what. But I
> > > wouldn't like to disrupt the discussion with naming conventions too
> much
> > at
> > > this point. I just mention it here as a thought.
> > >
> > > Looking forward to see the final details of this KIP. Great work so
> far!
> > >
> > > Konstantine
> > >
> > >
> > > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <bche...@outlook.com>
> wrote:
> > >
> > > > Thanks Guozhang for the great summary here, and I have been following
> > up
> > > > the action items here.
> > > >
> > > >
> > > >   1.  I already updated the KIP to remove the expansion timeout and
> > > > registration timeout. Great to see them being addressed in client
> side!
> > > >   2.  I double checked the design and I believe that it is ok to have
> > > both
> > > > static member and dynamic member co-exist in the same group. So the
> > > upgrade
> > > > shouldn't be destructive and we are removing the two membership
> > protocol
> > > > switching APIs.
> > > >   3.  I only have question about this one. I'm still reading the
> > > KafkaApis
> > > > code here. Should I just use the same authorization logic for
> > > > ForceStaticRebalanceRequest as JoinGroupRequest?
> > > >   4.  I'm very excited to see this work with K8! Like you suggested,
> > this
> > > > feature could be better addressed in a separate KIP because it is
> > pretty
> > > > independent. I could start drafting the KIP once the current proposal
> > is
> > > > approved.
> > > >   5.  I believe that we don't need fencing in offset commit request,
> > > since
> > > > duplicate member.name issue could be handled by join group request.
> We
> > > > shall reject join group with known member name but no member id
> (which
> > > > means we already have an active member using this identity).
> > > >   6.  I agree to remove that internal config once we move forward
> with
> > > > static membership. And I already removed the entire section from the
> > KIP.
> > > >
> > > > Let me know if you have other concerns.
> > > >
> > > > Best,
> > > > Boyang
> > > > ________________________________
> > > > From: Guozhang Wang <wangg...@gmail.com>
> > > > Sent: Tuesday, November 20, 2018 4:21 PM
> > > > To: dev
> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by
> > > > specifying member id
> > > >
> > > > Hello Boyang,
> > > >
> > > > Thanks a lot for the KIP! It is a great write-up and I appreciate
> your
> > > > patience answering to the feedbacks from the community. I'd like to
> add
> > > my
> > > > 2cents here:
> > > >
> > > > 1. By introducing another two timeout configs, registration_timeout
> and
> > > > expansion_timeout, we are effectively having four timeout configs:
> > > session
> > > > timeout, rebalance timeout (configured as "max.poll.interval.ms" on
> > > client
> > > > side), and these two. Interplaying these timeout configs can be quite
> > > hard
> > > > for users with such complexity, and hence I'm wondering if we can
> > > simplify
> > > > the situation with as less possible timeout configs as possible. Here
> > is
> > > a
> > > > concrete suggestion I'd like propose:
> > > >
> > > > 1.a) Instead of introducing a registration_timeout in addition to the
> > > > session_timeout for static members, we can just reuse the
> > session_timeout
> > > > and ask users to set it to a larger value when they are upgrading a
> > > dynamic
> > > > client to a static client by setting the "member.name" at the same
> > time.
> > > > By
> > > > default, the broker-side min.session.timeout is 6 seconds and
> > > > max.session.timeout is 5 minutes, which seems reasonable to me (we
> can
> > of
> > > > course modify this broker config to enlarge the valid interval if we
> > want
> > > > in practice). And then we should also consider removing the condition
> > for
> > > > marking a client as failed if the rebalance timeout has reached while
> > the
> > > > JoinGroup was not received, so that the semantics of session_timeout
> > and
> > > > rebalance_timeout are totally separated: the former is only used to
> > > > determine if a consumer member of the group should be marked as
> failed
> > > and
> > > > kicked out of the group, and the latter is only used to determine the
> > > > longest time coordinator should wait for PREPARE_REBALANCE phase. In
> > > other
> > > > words if a member did not send the JoinGroup in time of the
> > > > rebalance_timeout, we still include it in the new generation of the
> > group
> > > > and use its old subscription info to send to leader for assignment.
> > Later
> > > > if the member came back with HeartBeat request, we can still follow
> the
> > > > normal path to bring it to the latest generation while checking that
> > its
> > > > sent JoinGroup request contains the same subscription info as we used
> > to
> > > > assign the partitions previously (which should be likely the case in
> > > > practice). In addition, we should let static members to not send the
> > > > LeaveGroup request when it is gracefully shutdown, so that a static
> > > member
> > > > can only be leaving the group if its session has timed out, OR it has
> > > been
> > > > indicated to not exist in the group any more (details below).
> > > >
> > > > 1.b) We have a parallel discussion about Incremental Cooperative
> > > > Rebalancing, in which we will encode the "when to rebalance" logic at
> > the
> > > > application level, instead of at the protocol level. By doing this we
> > can
> > > > also enable a few other optimizations, e.g. at the Streams level to
> > first
> > > > build up the state store as standby tasks and then trigger a second
> > > > rebalance to actually migrate the active tasks while keeping the
> actual
> > > > rebalance latency and hence unavailability window to be small (
> > > >
> > > >
> > >
> >
> https://nam05.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=ZJlsB%2FHLhZykd9mtu5CINNNEqMBvX75bdhqR3IlxGI8%3D&amp;reserved=0
> > > ).
> > > > I'd propose we align
> > > > KIP-345 along with this idea, and hence do not add the
> > expansion_timeout
> > > as
> > > > part of the protocol layer, but only do that at the application's
> > > > coordinator / assignor layer (Connect, Streams, etc). We can still,
> > > > deprecate the "*group.initial.rebalance.delay.ms
> > > > <
> > > >
> > >
> >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&amp;data=02%7C01%7C%7Cdde139857e7a4a3a83dd08d651d9c93e%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636786393153281080&amp;sdata=DDhjV41nPU3euYCQ3w8WPENuw9fPB6ah2j6rF0JjRBg%3D&amp;reserved=0
> > > >*"
> > > > though as part of this KIP
> > > > since we have discussed about its limit and think it is actually not
> a
> > > very
> > > > good design and could be replaced with client-side logic above.
> > > >
> > > >
> > > > 2. I'd like to see your thoughts on the upgrade path for this KIP.
> More
> > > > specifically, let's say after we have upgraded broker version to be
> > able
> > > to
> > > > recognize the new versions of JoinGroup request and the admin
> requests,
> > > how
> > > > should we upgrade the clients and enable static groups? On top of my
> > head
> > > > if we do a rolling bounce in which we set the member.name config as
> > well
> > > > as
> > > > optionally increase the session.timeout config when we bounce each
> > > > instance, then during this rolling bounces we will have a group
> > contained
> > > > with both dynamic members and static members. It means that we should
> > > have
> > > > the group to allow such scenario (i.e. we cannot reject JoinGroup
> > > requests
> > > > from dynamic members), and hence the "member.name" -> "member.id"
> > > mapping
> > > > will only be partial at this scenario. Also could you describe if the
> > > > upgrade to the first version that support this feature would ever get
> > any
> > > > benefits, or only the future upgrade path for rolling bounces could
> get
> > > > benefits out of this feature?
> > > >
> > > > If that's the case and we will do 1) as suggested above, do we still
> > need
> > > > the enableStaticMembership and enableDynamicMembership admin requests
> > any
> > > > more? Seems it is not necessary any more as we will only have the
> > notion
> > > of
> > > > "dynamic or static members" that can co-exist in a group while there
> no
> > > > notion of "dynamic or static groups", and hence these two requests
> are
> > > not
> > > > needed anymore.
> > > >
> > > >
> > > > 3. We need to briefly talk about the implications for ACL as we
> > introduce
> > > > new admin requests that are related to a specific group.id. For
> > example,
> > > > we
> > > > need to make sure that whoever created the group or joined the group
> > can
> > > > actually send admin requests for the group, otherwise the application
> > > > owners need to bother the Kafka operators on a multi-tenant cluster
> > every
> > > > time they want to send any admin requests for their groups which
> would
> > be
> > > > an operational nightmare.
> > > >
> > > >
> > > > 4. I like Jason's suggestion of adding an optional field for the list
> > of
> > > > member names, and I'm wondering if that can be done as part of the
> > > > forceStaticRebalance request: i.e. by passing a list of members, we
> > will
> > > > enforce a rebalance immediately since it indicates that some static
> > > member
> > > > will be officially kicked out of the group and some new static
> members
> > > may
> > > > be added. So back to 1.a) above, a static member can only be kicked
> out
> > > of
> > > > the group if a) its session (arguably long period of time) has timed
> > out,
> > > > and b) this admin request explicitly state that it is no longer part
> of
> > > the
> > > > group. As for execution I'm fine with keeping it as a future work of
> > this
> > > > KIP if you'd like to make its scope smaller.
> > > >
> > > > Following are minor comments:
> > > >
> > > > 5. I'm not sure if we need to include "member.name" as part of the
> > > > OffsetCommitRequest for fencing purposes, as I think the memberId
> plus
> > > the
> > > > generation number should be sufficient for fencing even with static
> > > > members.
> > > >
> > > > 6. As mentioned above, if we agree to do 1) we can get rid of the "
> > > > LEAVE_GROUP_ON_CLOSE_CONFIG" config.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Boyang,
> > > > >
> > > > > Thanks for the proposal! This is very useful. I have some comments
> > > below:
> > > > >
> > > > > 1) The motivation currently explicitly states that the goal is to
> > > improve
> > > > > performance for heavy state application. It seems that the
> motivation
> > > can
> > > > > be stronger with the following use-case. Currently for MirrorMaker
> > > > cluster
> > > > > with e.g. 100 MirrorMaker processes, it will take a long time to
> > > rolling
> > > > > bounce the entire MirrorMaker cluster. Each MirrorMaker process
> > restart
> > > > > will trigger a rebalance which currently pause the consumption of
> the
> > > all
> > > > > partitions of the MirrorMaker cluster. With the change stated in
> this
> > > > > patch, as long as a MirrorMaker can restart within the specified
> > > timeout
> > > > > (e.g. 2 minutes), then we only need constant number of rebalance
> > (e.g.
> > > > for
> > > > > leader restart) for the entire rolling bounce, which will
> > significantly
> > > > > improves the availability of the MirrorMaker pipeline. In my
> opinion,
> > > the
> > > > > main benefit of the KIP is to avoid unnecessary rebalance if the
> > > consumer
> > > > > process can be restarted within soon, which helps performance even
> if
> > > > > overhead of state shuffling for a given process is small.
> > > > >
> > > > > 2) In order to simplify the KIP reading, can you follow the writeup
> > > style
> > > > > of other KIP (e.g. KIP-98) and list the interface change such as
> new
> > > > > configs (e.g. registration timeout), new request/response, new
> > > > AdminClient
> > > > > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently
> some
> > > of
> > > > > these are specified in the Proposed Change section which makes it a
> > bit
> > > > > inconvenient to understand the new interface that will be exposed
> to
> > > > user.
> > > > > Explanation of the current two-phase rebalance protocol probably
> can
> > be
> > > > > moved out of public interface section.
> > > > >
> > > > > 3) There are currently two version of JoinGroupRequest in the KIP
> and
> > > > only
> > > > > one of them has field memberId. This seems confusing.
> > > > >
> > > > > 4) It is mentioned in the KIP that "An admin API to force rebalance
> > > could
> > > > > be helpful here, but we will make a call once we finished the major
> > > > > implementation". So this seems to be still an open question in the
> > > > current
> > > > > design. We probably want to agree on this before voting for the
> KIP.
> > > > >
> > > > > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can
> > you
> > > > > specify the name of the config key and the default config value?
> > > Possible
> > > > > default values include empty string or null (similar to
> > transaction.id
> > > > in
> > > > > producer config).
> > > > >
> > > > > 6) Regarding the use of the topic "static_member_map" to persist
> > member
> > > > > name map, currently if consumer coordinator broker goes offline,
> > > > rebalance
> > > > > is triggered and consumers will try connect to the new coordinator.
> > If
> > > > > these consumers can connect to the new coordinator within
> > > > > max.poll.interval.ms which by default is 5 minutes, given that
> > broker
> > > > can
> > > > > use a deterministic algorithm to determine the partition ->
> > member_name
> > > > > mapping, each consumer should get assigned the same set of
> partitions
> > > > > without requiring state shuffling. So it is not clear whether we
> > have a
> > > > > strong use-case for this new logic. Can you help clarify what is
> the
> > > > > benefit of using topic "static_member_map" to persist member name
> > map?
> > > > >
> > > > > 7) Regarding the introduction of the expensionTimeoutMs config, it
> is
> > > > > mentioned that "we are using expansion timeout to replace rebalance
> > > > > timeout, which is configured by max.poll.intervals from client
> side,
> > > and
> > > > > using registration timeout to replace session timeout". Currently
> the
> > > > > default max.poll.interval.ms is configured to be 5 minutes and
> there
> > > > will
> > > > > be only one rebalance if all new consumers can join within 5
> minutes.
> > > So
> > > > it
> > > > > is not clear whether we have a strong use-case for this new config.
> > Can
> > > > you
> > > > > explain what is the benefit of introducing this new config?
> > > > >
> > > > > 8) It is mentioned that "To distinguish between previous version of
> > > > > protocol, we will also increase the join group request version to
> v4
> > > when
> > > > > MEMBER_NAME is set" and "If the broker version is not the latest (<
> > > v4),
> > > > > the join group request shall be downgraded to v3 without setting
> the
> > > > member
> > > > > Id". It is probably simpler to just say that this feature is
> enabled
> > if
> > > > > JoinGroupRequest V4 is supported on both client and broker and
> > > > MEMBER_NAME
> > > > > is configured with non-empty string.
> > > > >
> > > > > 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET
> > > error
> > > > > in OffsetCommitResponse for "commit requests under static
> > membership".
> > > > Can
> > > > > you clarify how broker determines whether the commit request is
> under
> > > > > static membership?
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125

Reply via email to