Regarding Jason's question and Boyang's responses:

2) I once have a discussion about the LeaveGroupRequest for static members,
and the reason for not having it for static members is that we'd need to
make it a configurable behavior as well (i.e. the likelihood that a static
member may shutdown but come back later may be even larger than the
likelihood that a shutdown static member would not come back), and when a
shutdown is complete the instance cannot tell whether or not it will come
back by itself. And hence letting a third party (think: admin used by K8s
plugins) issuing a request to indicate static member changes would be more
plausible.

I think having an optional list of all the static members that are still in
the group, rather than the members to be removed since the latter looks a
bit less flexible to me, in the request is a good idea (remember we allow a
group to have both static and dynamic members at the same time, so when
receiving the request, we will only do the diff and add / remove the static
members directly only, while still let the dynamic members to try to
re-join the group with the rebalance timeout).

3) personally I favor "ids" over "names" :) Since we already have some
"ids" and hence it sounds more consistent, plus on the producer side we
have a `transactional.id` whose semantics is a bit similar to this one,
i.e. for unique distinguishment of a client which may comes and goes but
need to be persist over multiple "instance life-times".


Guozhang


On Tue, Nov 27, 2018 at 10:00 AM Mayuresh Gharat <gharatmayures...@gmail.com>
wrote:

> Hi Boyang,
>
> Thanks for the replies. Please find the follow up queries below.
>
>     5. Regarding "So in summary, *the member will only be removed due to
> session timeout*. We shall remove it from both in-memory static member name
> mapping and member list." If the rebalance is invoked manually using the
> the admin apis, how long should the group coordinator wait for the members
> of the group to send a JoinGroupRequest for participating in the rebalance?
> How is a lagging consumer handled?
> The plan is to disable member kick out when rebalance.timeout is reached,
> so basically we are not "waiting" any
> join group request from existing members; we shall just rebalance base on
> what we currently have within the group
> metadata. Lagging consumer will trigger rebalance later if session timeout
> > rebalance timeout.
>
> >
> Just wanted to understand this better. Lets take an example, say we have a
> > consumer group "GroupA" with 4 consumers  c1, c2, c3, c4.
> > Everything is running fine and suddenly C4 host has issues and it goes
> > down. Now we notice that we can still operate with c1, c2, c3 and don't
> > want to wait for
> > c4 to come back up. We use the admin api
> > "invokeConsumerRebalance("GroupA")".
> > Now the GroupCoordinator, will ask the members c1, c2, c3 to join the
> > group again (in there heartBeatResponse) as first step of rebalance.
> > Now lets say that c1, c2 immediately send a joinGroupRequest but c3 is
> > delayed. At this stage, if we are not "waiting" on any join group
> request,
> > few things can happen :
> >
> >    - c4's partitions are distributed only among c1,c2. c3 maintains its
> >    original assignment. c1, c2 will start processing the newly assigned
> >    partitions.
> >
> > OR
> >
> >    - c4's partitions are distributed among c1, c2, c3. c1 and c2 start
> >    processing the newly assigned partitions. c3 gets to know about the
> newly
> >    assigned partitions later when it sends the JoinGroupRequest (which
> was
> >    delayed).
> >
> > OR
> >
> >    - Will the rebalance do a complete reassignment, where c1, c2, c3 have
> >    to give up there partitions and all the partitions belonging to c1,
> c2, c3,
> >    c4 will be redistributed among c1, c2, c3 ? If this is the case, the
> >    GroupCoordinator needs to give some buffer time for c1, c2, c3 to
> revoke
> >    there partitions and rejoin the group.
> >
> > This is as per my understanding of how the KIP would work without
> changing
> > the underlying group coordination workflow. Please correct me if I
> > misunderstood something here.
> >
>
>
> - When we say that we would use invokeConsumerRebalance(groupId) to down
> scale, with the example in the above question, how will the
> GroupCoordinator know that c4 should be kicked out of the group since we
> are trying to invoke rebalance proactively without waiting for c4's session
> time out to expire. Should there be a way of telling the GroupCoordinator
> that consumer c4 has been kicked out of the groupId = "GroupA"?
>
> - Also it looks like the statement "If the `member.id` uses
> UNKNOWN_MEMBER_NAME, we shall always generate a new member id and replace
> the one within current map, if `group.member.name` is known. Also once we
> are done with KIP-394
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-394%3A+Require+member.id+for+initial+join+group+request
> >,
> all the join group requests are requiring `member.id` to physically enter
> the consumer group. This way the latest joined " is incomplete. Can you
> take a look at this?
> Also when we say "all the join group requests are requiring `member.id` to
> physically enter the consumer group." because a newly started consumer will
> not have a "member.id", I assume you mean, once the GroupCoordinator
> assigns a member.id to the newly started consumer, it has to use it for
> any
> future JoinGroupRequests. Is my understanding correct?
>
>
> Thanks,
>
> Mayuresh
>
> On Mon, Nov 26, 2018 at 9:20 PM Boyang Chen <bche...@outlook.com> wrote:
>
> > Thanks Mayuresh and Jason for your follow-ups! Let me try to answer both
> > in this reply.
> >
> >
> > >    1. Do you intend to have member.id is a static config like
> > member.name
> > >    after KIP-345 and KIP-394?
> >
> > No, we shall only rely on broker to allocate member.id for the consumer
> > instances. FYI, I already
> >
> > started the discussion thread for KIP-394 😊
> >
> > >    2. Regarding "On client side, we add a new config called MEMBER_NAME
> > in
> > >    ConsumerConfig. On consumer service init, if the MEMBER_NAME config
> is
> > > set,
> > >    we will put it in the initial join group request to identify itself
> > as a
> > >    static member (static membership); otherwise, we will still send
> > >    UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID
> > (dynamic
> > >    membership)."
> > >       - What is the value of member_id sent in the first
> JoinGroupRequest
> > >       when member_name is set (using static rebalance)? Is it
> > > UNKNOW_MEMBER_ID?
> >
> > Yes, we could only use unknown member id. Actually this part of the
> > proposal is outdated,
> >
> > let me do another audit of the whole doc. Basically, it is currently
> > impossible to send `member.id`
> >
> > when consumer restarted. Sorry for the confusions!
> >
> > >    3. Regarding "we are requiring member.id (if not unknown) to match
> > the
> > >    value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge
> > case
> > >    that if we could have members with the same `member.name` (for
> > example
> > >    mis-configured instances with a valid member.id but added a used
> > member
> > >    name on runtime). When member name has duplicates, we could refuse
> > join
> > >    request from members with an outdated `member.id` (since we update
> > the
> > >    mapping upon each join group request). In an edge case where the
> > client
> > >    hits this exception in the response, it is suggesting that some
> other
> > >    consumer takes its spot."
> > >       - The part of "some other consumer takes the spot" would be
> > >       intentional, right? Also when you say " The edge case that if we
> > >       could have members with the same `member.name` (for example
> > >       mis-configured instances *with a valid member.id <
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=cGI1RvreeJNeHgWkXu6ZZ%2BIMzD8uR2y9OKaQKc7Vsf8%3D&amp;reserved=0
> > >
> > > *but
> > >       added a used member name on runtime).", what do you mean by
> *valid
> > >       member id* here? Does it mean that there exist a mapping of
> > >       member.name to member.id like *MemberA -> id1* on the
> > >       GroupCoordinator and this consumer is trying to join with *
> > > member.name
> > >       <
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=zSs67QfaXFLcI%2FiWTB3nfHuQLCHeZZUdz5hvD%2Brh5ug%3D&amp;reserved=0
> >
> > = MemberB and member.id <
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=cGI1RvreeJNeHgWkXu6ZZ%2BIMzD8uR2y9OKaQKc7Vsf8%3D&amp;reserved=0
> >
> > =
> > > id1 *
> > >       ?
> >
> > I would take Jason's advice that each time we have unknown member joining
> > the group, the broker will
> >
> > always assign a new and unique id to track its identity. In this way,
> > consumer with duplicate member name
> >
> > will be fenced.
> >
> > >    4. Depending on your explanation for point 2 and the point 3 above
> > >    regarding returning back MEMBER_ID_MISMATCH on having a matching
> > >    member_name but unknown member_id, if the consumer sends
> > > "UNKNOW_MEMBER_ID"
> > >    on the first JoinGroupRequest and relies on the GroupCoordinator to
> > > give it
> > >    a member_id, is the consumer suppose to remember member_id for
> > >    joinGroupRequests? If yes, how are restarts handled?
> >
> > Like explained above, we shall not materialize the member.id. Instead we
> > need to rely on broker to allocate
> >
> > a unique id for consumer just like what we have now.
> >
> > >    5. Regarding "So in summary, *the member will only be removed due to
> > >    session timeout*. We shall remove it from both in-memory static
> member
> > >    name mapping and member list."
> > >       - If the rebalance is invoked manually using the the admin apis,
> > how
> > >       long should the group coordinator wait for the members of the
> > > group to send
> > >       a JoinGroupRequest for participating in the rebalance? How is a
> > > lagging
> > >       consumer handled?
> >
> > The plan is to disable member kick out when rebalance.timeout is reached,
> > so basically we are not "waiting" any
> >
> > join group request from existing members; we shall just rebalance base on
> > what we currently have within the group
> >
> > metadata. Lagging consumer will trigger rebalance later if session
> timeout
> > > rebalance timeout.
> >
> > >    6. Another detail to take care is that we need to automatically take
> > the
> > >    hash of group id so that we know which broker to send this request
> to.
> > >       - I assume this should be same as the way we find the
> coordinator,
> > >       today right? If yes, should we specify it in the KIP ?
> >
> > Yep, it is. Add FindCoordinatorRequest logic to the script.
> >
> > >    7. Are there any specific failure scenarios when you say "other
> > >    potential failure cases."? It would be good to mention them
> > explicitly,
> > > if
> > >    you think there are any.
> >
> > Nah, I'm gonna remove it because it seems causing more confusion than
> > making my assumption clear, which is
> >
> > "there could be other failure cases that I can't enumerate now" 😊
> >
> > >    8. It would be good to have a rollback plan as you have for roll
> > forward
> > >    in the KIP.
> >
> > Great suggestion! Added a simple rollback plan.
> >
> >
> > Next is answering Jason's suggestions:
> >
> > 1. This may be the same thing that Mayuresh is asking about. I think the
> > suggestion in the KIP is that if a consumer sends JoinGroup with a member
> > name, but no member id, then we will return the current member id
> > associated with that name. It seems in this case that we wouldn't be able
> > to protect from having two consumers active with the same configured
> > member.name? For example, imagine that we had a consumer with
> member.name
> > =A
> > which is assigned member.id=1. Suppose it becomes a zombie and a new
> > instance starts up with member.name=A. If it is also assigned member.id
> =1,
> > then how can we detect the zombie if it comes back to life? Both
> instances
> > will have the same member.id.
> >
> > The goal is to avoid a rebalance on a rolling restart, but we still need
> to
> > fence previous members. I am wondering if we can generate a new
> member.id
> > every time we receive a request from a static member with an unknown
> member
> > id. If the old instance with the same member.name attempts any
> operation,
> > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long as the
> > subscription of the new instance hasn't changed, then we can skip the
> > rebalance and return the current assignment without forcing a rebalance.
> >
> > The trick to making this work is in the error handling of the zombie
> > consumer. If the zombie simply resets its member.id and rejoins to get a
> > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would end up
> > fencing the new member. We want to avoid this. There needs to be an
> > expectation for static members that the member.id of a static member
> will
> > not be changed except when a new member with the same member.name joins
> > the
> > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for consumers
> > with static member names.
> >
> > Yep, I like this idea! Keep giving out refresh member.id when facing
> > anonymous request will definitely
> >
> > prevent processing bug due to duplicate consumers, however I don't think
> I
> > fully understand the 3rd paragraph where
> >
> > you mentioned  "There needs to be an expectation for static members that
> > the member.id of a static member will
> >
> > not be changed except when a new member with the same member.name joins
> > the group. "  How do you plan
> > to know whether this member is new member or old member? I feel even with
> > zombie consumer takes the ownership,
> > it should be detected very quickly (as MISMATCH_ID exception trigger
> > original consumer instance dies)
> > and end user will start to fix it right away. Is there any similar logic
> > we applied in fencing duplicate `transaction.id`?
> >
> > 2. The mechanics of the ConsumerRebalance API seem unclear to me. As far
> as
> > I understand it, it is used for scaling down a consumer group and somehow
> > bypasses normal session timeout expiration. I am wondering how critical
> > this piece is and whether we can leave it for future work. If not, then
> it
> > would be helpful to elaborate on its implementation. How would the
> > coordinator know which members to kick out of the group?
> >
> > This API is needed when we need to immediately trigger rebalance instead
> > of waiting session timeout
> >
> > or rebalance timeout (Emergent scale up/down). It is very necessary to
> > have it for
> >
> > management purpose because user could choose when to trigger rebalance
> > pretty freely,
> >
> > gaining more client side control.
> >
> > In the meanwhile I see your point that we need to actually have the
> > ability to kick out members that we plan
> >
> > to scale down fast (as rebalance timeout no longer kicks any offline
> > member out of the group), I will think of adding an optional
> >
> > list of members that are ready to be removed.
> >
> > Another idea is to let static member send `LeaveGroupRequest` when they
> > are going offline (either scale down or bouncing),
> >
> > and broker will cache this information as "OfflineMembers" without
> > triggering rebalance. When handling ConsumerRebalanceRequest broker will
> >
> > kick the static members that are currently offline and trigger rebalance
> > immediately. How does this plan sound?
> >
> > 3. I've been holding back on mentioning this, but I think we should
> > reconsider the name `member.name`. I think we want something that
> suggests
> > its expectation of uniqueness in the group. How about `group.instance.id
> `
> > to go along with `group.id`?
> >
> > Yea, Dong and Stanislav also mentioned this naming. I personally buy in
> > the namespace idea, and
> >
> > since we already use `member.name` in a lot of context, I decide to
> > rename the config to `group.member.name`
> >
> > which should be sufficient for solving all the concerns we have now.
> > Sounds good?
> >
> >
> > Thank you for your great suggestions! Let me know if my reply makes sense
> > her.
> >
> >
> > Best,
> >
> > Boyang
> >
> > ________________________________
> > From: Jason Gustafson <ja...@confluent.io>
> > Sent: Tuesday, November 27, 2018 7:51 AM
> > To: dev
> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> > specifying member id
> >
> > Hi Boyang,
> >
> > Thanks for the updates. Looks like we're headed in the right direction
> and
> > clearly the interest that this KIP is receiving shows how strong the
> > motivation is!
> >
> > I have a few questions:
> >
> > 1. This may be the same thing that Mayuresh is asking about. I think the
> > suggestion in the KIP is that if a consumer sends JoinGroup with a member
> > name, but no member id, then we will return the current member id
> > associated with that name. It seems in this case that we wouldn't be able
> > to protect from having two consumers active with the same configured
> > member.name? For example, imagine that we had a consumer with
> member.name
> > =A
> > which is assigned member.id=1. Suppose it becomes a zombie and a new
> > instance starts up with member.name=A. If it is also assigned member.id
> =1,
> > then how can we detect the zombie if it comes back to life? Both
> instances
> > will have the same member.id.
> >
> > The goal is to avoid a rebalance on a rolling restart, but we still need
> to
> > fence previous members. I am wondering if we can generate a new
> member.id
> > every time we receive a request from a static member with an unknown
> member
> > id. If the old instance with the same member.name attempts any
> operation,
> > then it will be fenced with an UNKNOWN_MEMBER_ID error. As long as the
> > subscription of the new instance hasn't changed, then we can skip the
> > rebalance and return the current assignment without forcing a rebalance.
> >
> > The trick to making this work is in the error handling of the zombie
> > consumer. If the zombie simply resets its member.id and rejoins to get a
> > new one upon receiving the UNKNOWN_MEMBER_ID error, then it would end up
> > fencing the new member. We want to avoid this. There needs to be an
> > expectation for static members that the member.id of a static member
> will
> > not be changed except when a new member with the same member.name joins
> > the
> > group. Then we can treat UNKNOWN_MEMBER_ID as a fatal error for consumers
> > with static member names.
> >
> > 2. The mechanics of the ConsumerRebalance API seem unclear to me. As far
> as
> > I understand it, it is used for scaling down a consumer group and somehow
> > bypasses normal session timeout expiration. I am wondering how critical
> > this piece is and whether we can leave it for future work. If not, then
> it
> > would be helpful to elaborate on its implementation. How would the
> > coordinator know which members to kick out of the group?
> >
> > 3. I've been holding back on mentioning this, but I think we should
> > reconsider the name `member.name`. I think we want something that
> suggests
> > its expectation of uniqueness in the group. How about `group.instance.id
> `
> > to go along with `group.id`?
> >
> > Thanks,
> > Jason
> >
> >
> >
> > On Mon, Nov 26, 2018 at 10:18 AM Mayuresh Gharat <
> > gharatmayures...@gmail.com>
> > wrote:
> >
> > > Hi Boyang,
> > >
> > > Thanks a lot for replying to all the queries and discussions here, so
> > > patiently.
> > > Really appreciate it.
> > >
> > > Had a few questions and suggestions after rereading the current version
> > of
> > > the KIP :
> > >
> > >
> > >    1. Do you intend to have member.id is a static config like
> > member.name
> > >    after KIP-345 and KIP-394?
> > >    2. Regarding "On client side, we add a new config called MEMBER_NAME
> > in
> > >    ConsumerConfig. On consumer service init, if the MEMBER_NAME config
> is
> > > set,
> > >    we will put it in the initial join group request to identify itself
> > as a
> > >    static member (static membership); otherwise, we will still send
> > >    UNKNOWN_MEMBER_ID to ask broker for allocating a new random ID
> > (dynamic
> > >    membership)."
> > >       - What is the value of member_id sent in the first
> JoinGroupRequest
> > >       when member_name is set (using static rebalance)? Is it
> > > UNKNOW_MEMBER_ID?
> > >    3. Regarding "we are requiring member.id (if not unknown) to match
> > the
> > >    value stored in cache, otherwise reply MEMBER_ID_MISMATCH. The edge
> > case
> > >    that if we could have members with the same `member.name` (for
> > example
> > >    mis-configured instances with a valid member.id but added a used
> > member
> > >    name on runtime). When member name has duplicates, we could refuse
> > join
> > >    request from members with an outdated `member.id` (since we update
> > the
> > >    mapping upon each join group request). In an edge case where the
> > client
> > >    hits this exception in the response, it is suggesting that some
> other
> > >    consumer takes its spot."
> > >       - The part of "some other consumer takes the spot" would be
> > >       intentional, right? Also when you say " The edge case that if we
> > >       could have members with the same `member.name` (for example
> > >       mis-configured instances *with a valid member.id <
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=cGI1RvreeJNeHgWkXu6ZZ%2BIMzD8uR2y9OKaQKc7Vsf8%3D&amp;reserved=0
> > >
> > > *but
> > >       added a used member name on runtime).", what do you mean by
> *valid
> > >       member id* here? Does it mean that there exist a mapping of
> > >       member.name to member.id like *MemberA -> id1* on the
> > >       GroupCoordinator and this consumer is trying to join with *
> > > member.name
> > >       <
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=zSs67QfaXFLcI%2FiWTB3nfHuQLCHeZZUdz5hvD%2Brh5ug%3D&amp;reserved=0
> >
> > = MemberB and member.id <
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.id&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=cGI1RvreeJNeHgWkXu6ZZ%2BIMzD8uR2y9OKaQKc7Vsf8%3D&amp;reserved=0
> >
> > =
> > > id1 *
> > >       ?
> > >    4. Depending on your explanation for point 2 and the point 3 above
> > >    regarding returning back MEMBER_ID_MISMATCH on having a matching
> > >    member_name but unknown member_id, if the consumer sends
> > > "UNKNOW_MEMBER_ID"
> > >    on the first JoinGroupRequest and relies on the GroupCoordinator to
> > > give it
> > >    a member_id, is the consumer suppose to remember member_id for
> > >    joinGroupRequests? If yes, how are restarts handled?
> > >    5. Regarding "So in summary, *the member will only be removed due to
> > >    session timeout*. We shall remove it from both in-memory static
> member
> > >    name mapping and member list."
> > >       - If the rebalance is invoked manually using the the admin apis,
> > how
> > >       long should the group coordinator wait for the members of the
> > > group to send
> > >       a JoinGroupRequest for participating in the rebalance? How is a
> > > lagging
> > >       consumer handled?
> > >    6. Another detail to take care is that we need to automatically take
> > the
> > >    hash of group id so that we know which broker to send this request
> to.
> > >       - I assume this should be same as the way we find the
> coordinator,
> > >       today right? If yes, should we specify it in the KIP ?
> > >    7. Are there any specific failure scenarios when you say "other
> > >    potential failure cases."? It would be good to mention them
> > explicitly,
> > > if
> > >    you think there are any.
> > >    8. It would be good to have a rollback plan as you have for roll
> > forward
> > >    in the KIP.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Mon, Nov 26, 2018 at 8:17 AM Mayuresh Gharat <
> > > gharatmayures...@gmail.com>
> > > wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > Do you have a discuss thread for KIP-394 that you mentioned here ?
> > > >
> > > > Thanks,
> > > >
> > > > Mayuresh
> > > >
> > > > On Mon, Nov 26, 2018 at 4:52 AM Boyang Chen <bche...@outlook.com>
> > wrote:
> > > >
> > > >> Hey Dong, thanks for the follow-up here!
> > > >>
> > > >>
> > > >> 1) It is not very clear to the user what is the difference between
> > > >> member.name and client.id as both seems to be used to identify the
> > > >> consumer. I am wondering if it would be more intuitive to name it
> > > >> group.member.name (preferred choice since it matches the current
> > > group.id
> > > >> config name) or rebalance.member.name to explicitly show that the
> id
> > is
> > > >> solely used for rebalance.
> > > >> Great question. I feel `member.name` is enough to explain itself,
> it
> > > >> seems not very
> > > >> helpful to make the config name longer. Comparing `name` with `id`
> > gives
> > > >> user the
> > > >> impression that they have the control over it with customized rule
> > than
> > > >> library decided.
> > > >>
> > > >> 2) In the interface change section it is said that
> > > >> GroupMaxSessionTimeoutMs
> > > >> will be changed to 30 minutes. It seems to suggest that we will
> change
> > > the
> > > >> default value of this config. It does not seem necessary to increase
> > the
> > > >> time of consumer failure detection when user doesn't use static
> > > >> membership.
> > > >> Also, say static membership is enabled, then this default config
> > change
> > > >> will cause a partition to be unavailable for consumption for 30
> > minutes
> > > if
> > > >> there is hard consumer failure, which seems to be worse experience
> > than
> > > >> having unnecessary rebalance (when this timeout is small),
> > particularly
> > > >> for
> > > >> new users of Kafka. Could you explain more why we should make this
> > > change?
> > > >> We are not changing the default session timeout value. We are just
> > > >> changing the
> > > >> cap we are enforcing on the session timeout max value. So this
> change
> > is
> > > >> not affecting
> > > >> what kind of membership end user is using, and loosing the cap is
> > giving
> > > >> end user
> > > >> more flexibility on trade-off between liveness and stability.
> > > >>
> > > >> 3) Could we just combine MEMBER_ID_MISMATCH and
> > DUPLICATE_STATIC_MEMBER
> > > >> into one error? It seems that these two errors are currently handled
> > by
> > > >> the
> > > >> consumer in the same way. And we don't also don't expect
> > > >> MEMBER_ID_MISMATCH
> > > >> to happen. Thus it is not clear what is the benefit of having two
> > > errors.
> > > >> I agree that we should remove DUPLICATE_STATIC_MEMBER error because
> > with
> > > >> the KIP-394<
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-394%253A%2BRequire%2Bmember%2Bid%2Bfor%2Binitial%2Bjoin%2Bgroup%2Brequest&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=EDM7PmpOo2HenYhFHX2rxrszpkI7di401WhKh2Vjw5k%3D&amp;reserved=0
> > > >> >
> > > >> we will automatically fence all join requests with
> UNKNOWN_MEMBER_ID.
> > > >>
> > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group
> > > contains
> > > >> member name which is already in the consumer group, however the
> member
> > > id
> > > >> was missing". After a consumer is restarted, it will send a
> > > >> JoinGroupRequest with an existing memberName (as the coordinator has
> > not
> > > >> expired this member from the memory) and memberId
> > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not
> persisted
> > > >> across consumer restart in the consumer side). Does it mean that
> > > >> JoinGroupRequest from a newly restarted consumer will always be
> > rejected
> > > >> until the sessionTimeoutMs has passed?
> > > >> Same answer as question 3). This part of the logic shall be removed
> > from
> > > >> the proposal.
> > > >>
> > > >> 5) It seems that we always add two methods to the interface
> > > >> org.apache.kafka.clients.admin.AdminClient.java, one with options
> and
> > > the
> > > >> other without option. Could this be specified in the interface
> change
> > > >> section?
> > > >> Sounds good! Added both methods.
> > > >>
> > > >> 6) Do we plan to have off-the-shelf command line tool for SRE to
> > trigger
> > > >> rebalance? If so, we probably want to specify the command line tool
> > > >> interface similar to
> > > >>
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=I1zGk61VzDvoGfPfzSbOms4l9g%2BrXQvttKsfNNwuuJ4%3D&amp;reserved=0
> > > >> .
> > > >> Added the script.
> > > >>
> > > >> 7) Would it be simpler to replace name "forceStaticRebalance" with
> > > >> "invokeConsumerRebalance"? It is not very clear what is the extra
> > > meaning
> > > >> of world "force" as compared to "trigger" or "invoke". And it seems
> > > >> simpler
> > > >> to allows this API to trigger rebalance regardless of whether
> consumer
> > > is
> > > >> configured with memberName.
> > > >> Sounds good. Right now I feel for both static and dynamic membership
> > it
> > > is
> > > >> more manageable to introduce the consumer rebalance method through
> > admin
> > > >> client API.
> > > >>
> > > >> 8) It is not very clear how the newly added AdminClient API trigger
> > > >> rebalance. For example, does it send request? Can this be explained
> in
> > > the
> > > >> KIP?
> > > >>
> > > >> Sure, I will add more details to the API.
> > > >>
> > > >>
> > > >> Thanks again for the helpful suggestions!
> > > >>
> > > >>
> > > >> Best,
> > > >> Boyang
> > > >>
> > > >> ________________________________
> > > >> From: Dong Lin <lindon...@gmail.com>
> > > >> Sent: Saturday, November 24, 2018 2:54 PM
> > > >> To: dev
> > > >> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances
> by
> > > >> specifying member id
> > > >>
> > > >> Hey Boyang,
> > > >>
> > > >> Thanks for the update! Here are some followup comments:
> > > >>
> > > >> 1) It is not very clear to the user what is the difference between
> > > >> member.name and client.id as both seems to be used to identify the
> > > >> consumer. I am wondering if it would be more intuitive to name it
> > > >> group.member.name (preferred choice since it matches the current
> > > group.id
> > > >> config name) or rebalance.member.name to explicitly show that the
> id
> > is
> > > >> solely used for rebalance.
> > > >>
> > > >> 2) In the interface change section it is said that
> > > >> GroupMaxSessionTimeoutMs
> > > >> will be changed to 30 minutes. It seems to suggest that we will
> change
> > > the
> > > >> default value of this config. It does not seem necessary to increase
> > the
> > > >> time of consumer failure detection when user doesn't use static
> > > >> membership.
> > > >> Also, say static membership is enabled, then this default config
> > change
> > > >> will cause a partition to be unavailable for consumption for 30
> > minutes
> > > if
> > > >> there is hard consumer failure, which seems to be worse experience
> > than
> > > >> having unnecessary rebalance (when this timeout is small),
> > particularly
> > > >> for
> > > >> new users of Kafka. Could you explain more why we should make this
> > > change?
> > > >>
> > > >> 3) Could we just combine MEMBER_ID_MISMATCH and
> > DUPLICATE_STATIC_MEMBER
> > > >> into one error? It seems that these two errors are currently handled
> > by
> > > >> the
> > > >> consumer in the same way. And we don't also don't expect
> > > >> MEMBER_ID_MISMATCH
> > > >> to happen. Thus it is not clear what is the benefit of having two
> > > errors.
> > > >>
> > > >> 4) The doc for DUPLICATE_STATIC_MEMBER says that "The join group
> > > contains
> > > >> member name which is already in the consumer group, however the
> member
> > > id
> > > >> was missing". After a consumer is restarted, it will send a
> > > >> JoinGroupRequest with an existing memberName (as the coordinator has
> > not
> > > >> expired this member from the memory) and memberId
> > > >> = JoinGroupRequest.UNKNOWN_MEMBER_ID (since memberId is not
> persisted
> > > >> across consumer restart in the consumer side). Does it mean that
> > > >> JoinGroupRequest from a newly restarted consumer will always be
> > rejected
> > > >> until the sessionTimeoutMs has passed?
> > > >>
> > > >> 5) It seems that we always add two methods to the interface
> > > >> org.apache.kafka.clients.admin.AdminClient.java, one with options
> and
> > > the
> > > >> other without option. Could this be specified in the interface
> change
> > > >> section?
> > > >>
> > > >> 6) Do we plan to have off-the-shelf command line tool for SRE to
> > trigger
> > > >> rebalance? If so, we probably want to specify the command line tool
> > > >> interface similar to
> > > >>
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-113%253A%2BSupport%2Breplicas%2Bmovement%2Bbetween%2Blog%2Bdirectories%23KIP-113%3ASupportreplicasmovementbetweenlogdirectories-Scripts&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=I1zGk61VzDvoGfPfzSbOms4l9g%2BrXQvttKsfNNwuuJ4%3D&amp;reserved=0
> > > >> .
> > > >>
> > > >> 7) Would it be simpler to replace name "forceStaticRebalance" with
> > > >> "invokeConsumerRebalance"? It is not very clear what is the extra
> > > meaning
> > > >> of world "force" as compared to "trigger" or "invoke". And it seems
> > > >> simpler
> > > >> to allows this API to trigger rebalance regardless of whether
> consumer
> > > is
> > > >> configured with memberName.
> > > >>
> > > >> 8) It is not very clear how the newly added AdminClient API trigger
> > > >> rebalance. For example, does it send request? Can this be explained
> in
> > > the
> > > >> KIP?
> > > >>
> > > >> Thanks,
> > > >> Dong
> > > >>
> > > >>
> > > >> On Thu, Nov 22, 2018 at 6:37 AM Boyang Chen <bche...@outlook.com>
> > > wrote:
> > > >>
> > > >> > Hey Mayuresh,
> > > >> >
> > > >> >
> > > >> > thanks for your feedbacks! I will try do another checklist here.
> > > >> >
> > > >> >
> > > >> > > By this you mean, even if the application has not called
> > > >> > > KafkaConsumer.poll() within session timeout, it will not be
> > sending
> > > >> the
> > > >> > > LeaveGroup request, right?
> > > >> >
> > > >> > Yep it's true, we will prevent client from sending leave group
> > request
> > > >> > when they are set with `member.name`.
> > > >> >
> > > >> >
> > > >> > > When is the member.name removed from this map?
> > > >> > Good question, we will only kick off member due to session timeout
> > > >> within
> > > >> > static membership. Let me update the KIP to clearly assert that.
> > > >> >
> > > >> > > How is this case (missing member id) handled on the client side?
> > > What
> > > >> is
> > > >> > the application that
> > > >> > > is using the KafkaConsumer suppose to do in this scenario?
> > > >> > I have extended the two exceptions within join group response V4.
> > > >> > Basically I define both corresponding actions to be immediate
> > failing
> > > >> > client application, because so far it is unknown what kind of
> client
> > > >> issue
> > > >> > could trigger them. After the first version, we will keep enhance
> > the
> > > >> error
> > > >> > handling logic!
> > > >> >
> > > >> > > This would mean that it might take more time to detect unowned
> > topic
> > > >> > > partitions and may cause delay for applications that perform
> data
> > > >> > mirroring
> > > >> > > tasks. I discussed this with our sre and we have a suggestion to
> > > make
> > > >> > here
> > > >> > > as listed below separately.
> > > >> > The goal of extending session timeout cap is for users with good
> > > client
> > > >> > side monitoring tools that could auto-heal the dead consumers very
> > > >> fast. So
> > > >> > it is optional (and personal) to extend session timeout to a
> > > reasonable
> > > >> > number with different client scenarios.
> > > >> >
> > > >> > > you meant remove unjoined members of the group, right ?
> > > >> > Yep, there is a typo. Thanks for catching this!
> > > >> >
> > > >> > > What do you mean by " Internally we would optimize this logic by
> > > >> having
> > > >> > > rebalance timeout only in charge of stopping prepare rebalance
> > > stage,
> > > >> > > without removing non-responsive members immediately." There
> would
> > > not
> > > >> be
> > > >> > a
> > > >> > > full rebalance if the lagging consumer sent a JoinGroup request
> > > later,
> > > >> > > right ? If yes, can you highlight this in the KIP ?
> > > >> > No, there won't be. We want to limit the rebalance timeout
> > > functionality
> > > >> > to only use as a timer to
> > > >> > end prepare rebalance stage. This way, late joining static members
> > > will
> > > >> > not trigger further rebalance
> > > >> > as long as they are within session timeout. I added your highlight
> > to
> > > >> the
> > > >> > KIP!
> > > >> >
> > > >> > > The KIP talks about scale up scenario but its not quite clear
> how
> > we
> > > >> > > handle it. Are we adding a separate "expansion.timeout" or we
> > adding
> > > >> > status
> > > >> > > "learner" ?. Can you shed more light on how this is handled in
> the
> > > >> KIP,
> > > >> > if
> > > >> > > its handled?
> > > >> > Updated the KIP: we shall not cover scale up case in 345, because
> we
> > > >> > believe client side could
> > > >> > better handle this logic.
> > > >> >
> > > >> > > I think Jason had brought this up earlier about having a way to
> > say
> > > >> how
> > > >> > > many members/consumer hosts are you choosing to be in the
> consumer
> > > >> group.
> > > >> > > If we can do this, then in case of mirroring applications we can
> > do
> > > >> this
> > > >> > :
> > > >> > > Lets say we have a mirroring application that consumes from
> Kafka
> > > >> cluster
> > > >> > > A and produces to Kafka cluster B.
> > > >> > > Depending on the data and the Kafka cluster configuration, Kafka
> > > >> service
> > > >> > > providers can set a mirroring group saying that it will take,
> for
> > > >> example
> > > >> > > 300 consumer hosts/members to achieve the desired throughput and
> > > >> latency
> > > >> > > for mirroring and can have additional 10 consumer hosts as spare
> > in
> > > >> the
> > > >> > > same group.
> > > >> > > So when the first 300 members/consumers to join the group will
> > start
> > > >> > > mirroring the data from Kafka cluster A to Kafka cluster B.
> > > >> > > The remaining 10 consumer members can sit idle.
> > > >> > > The moment one of the consumer (for example: consumer number 54)
> > > from
> > > >> the
> > > >> > > first 300 members go out of the group (crossed session timeout),
> > it
> > > >> (the
> > > >> > > groupCoordinator) can just assign the topicPartitions from the
> > > >> consumer
> > > >> > > member 54 to one of the spare hosts.
> > > >> > > Once the consumer member 54 comes back up, it can start as
> being a
> > > >> part
> > > >> > of
> > > >> > > the spare pool.
> > > >> > > This enables us to have lower session timeouts and low latency
> > > >> mirroring,
> > > >> > > in cases where the service providers are OK with having spare
> > hosts.
> > > >> > > This would mean that we would tolerate n consumer members
> leaving
> > > and
> > > >> > > rejoining the group and still provide low latency as long as n
> <=
> > > >> number
> > > >> > of
> > > >> > > spare consumers.
> > > >> > > If there are no spare host available, we can get back to the
> idea
> > as
> > > >> > > described in the KIP.
> > > >> > Great idea! In fact on top of static membership we could later
> > > introduce
> > > >> > APIs to set hard-coded
> > > >> > client ids to the group and replace the dead host, or as you
> > proposed
> > > to
> > > >> > define spare host as
> > > >> > what I understood as hot backup. I will put both Jason and your
> > > >> > suggestions into a separate section
> > > >> > called "Future works". Note that this spare host idea may be also
> > > >> solvable
> > > >> > through rebalance protocol
> > > >> > IMO.
> > > >> >
> > > >> > Thank you again for the great feedback!
> > > >> >
> > > >> > Boyang
> > > >> > ________________________________
> > > >> > From: Boyang Chen <bche...@outlook.com>
> > > >> > Sent: Thursday, November 22, 2018 3:39 PM
> > > >> > To: dev@kafka.apache.org
> > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> rebalances
> > by
> > > >> > specifying member id
> > > >> >
> > > >> > Hey Dong, sorry for missing your message. I couldn't find your
> email
> > > on
> > > >> my
> > > >> > thread, so I will just do a checklist here!
> > > >> >
> > > >> >
> > > >> > 1) The motivation currently explicitly states that the goal is to
> > > >> improve
> > > >> >
> > > >> > performance for heavy state application. It seems that the
> > motivation
> > > >> can
> > > >> >
> > > >> > be stronger with the following use-case. Currently for MirrorMaker
> > > >> cluster
> > > >> >
> > > >> > with e.g. 100 MirrorMaker processes, it will take a long time to
> > > rolling
> > > >> >
> > > >> > bounce the entire MirrorMaker cluster. Each MirrorMaker process
> > > restart
> > > >> >
> > > >> > will trigger a rebalance which currently pause the consumption of
> > the
> > > >> all
> > > >> >
> > > >> > partitions of the MirrorMaker cluster. With the change stated in
> > this
> > > >> >
> > > >> > patch, as long as a MirrorMaker can restart within the specified
> > > timeout
> > > >> >
> > > >> > (e.g. 2 minutes), then we only need constant number of rebalance
> > (e.g.
> > > >> for
> > > >> >
> > > >> > leader restart) for the entire rolling bounce, which will
> > > significantly
> > > >> >
> > > >> > improves the availability of the MirrorMaker pipeline. In my
> > opinion,
> > > >> the
> > > >> >
> > > >> > main benefit of the KIP is to avoid unnecessary rebalance if the
> > > >> consumer
> > > >> >
> > > >> > process can be restarted within soon, which helps performance even
> > if
> > > >> >
> > > >> > overhead of state shuffling for a given process is small.
> > > >> >
> > > >> > I just rephrased this part and added it to the KIP. Thanks for
> > making
> > > >> the
> > > >> > motivation more solid!
> > > >> >
> > > >> > 2) In order to simplify the KIP reading, can you follow the
> writeup
> > > >> style
> > > >> > of other KIP (e.g. KIP-98) and list the interface change such as
> new
> > > >> > configs (e.g. registration timeout), new request/response, new
> > > >> AdminClient
> > > >> > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently
> > some
> > > of
> > > >> > these are specified in the Proposed Change section which makes it
> a
> > > bit
> > > >> > inconvenient to understand the new interface that will be exposed
> to
> > > >> user.
> > > >> > Explanation of the current two-phase rebalance protocol probably
> can
> > > be
> > > >> > moved out of public interface section.
> > > >> > This is a great suggestion! I just consolidated all the public API
> > > >> > changes, and the whole KIP
> > > >> > looks much more organized!
> > > >> >
> > > >> > 3) There are currently two version of JoinGroupRequest in the KIP
> > and
> > > >> only
> > > >> > one of them has field memberId. This seems confusing.
> > > >> > Yep, I already found this issue and fixed it.
> > > >> >
> > > >> > 4) It is mentioned in the KIP that "An admin API to force
> rebalance
> > > >> could
> > > >> > be helpful here, but we will make a call once we finished the
> major
> > > >> > implementation". So this seems to be still an open question in the
> > > >> current
> > > >> > design. We probably want to agree on this before voting for the
> KIP.
> > > >> > We have finalized the idea that this API is needed.
> > > >> >
> > > >> > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can
> > you
> > > >> > specify the name of the config key and the default config value?
> > > >> Possible
> > > >> > default values include empty string or null (similar to
> > > transaction.id<
> > > >> >
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Ftransaction.id%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3Db2d8sQWM8niJreqST7%252BJLcxfEyBmj7cJp4Lm5cYT57s%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=N8RxV6%2Bh7ib9CMpW3ZyFq3m2awY1sRPHzlOTi6qU5XY%3D&amp;reserved=0
> > > >> >
> > > >> > in
> > > >> > producer config).
> > > >> > I have defined the `member.name` in "New configuration" section.
> > > >> >
> > > >> > 6) Regarding the use of the topic "static_member_map" to persist
> > > member
> > > >> > name map, currently if consumer coordinator broker goes offline,
> > > >> rebalance
> > > >> > is triggered and consumers will try connect to the new
> coordinator.
> > If
> > > >> > these consumers can connect to the new coordinator within
> > > >> > max.poll.interval.ms<
> > > >> >
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=F11nFuanrUtwwp1YVwvmYDsuV0mIs6QKt%2Bf2gPxD2t8%3D&amp;reserved=0
> > > >> >
> > > >> > which by default is 5 minutes, given that broker can
> > > >> > use a deterministic algorithm to determine the partition ->
> > > member_name
> > > >> > mapping, each consumer should get assigned the same set of
> > partitions
> > > >> > without requiring state shuffling. So it is not clear whether we
> > have
> > > a
> > > >> > strong use-case for this new logic. Can you help clarify what is
> the
> > > >> > benefit of using topic "static_member_map" to persist member name
> > map?
> > > >> > I have discussed with Guozhang offline, and I believe reusing the
> > > >> current
> > > >> > `_consumer_offsets`
> > > >> > topic is a better and unified solution.
> > > >> >
> > > >> > 7) Regarding the introduction of the expensionTimeoutMs config, it
> > is
> > > >> > mentioned that "we are using expansion timeout to replace
> rebalance
> > > >> > timeout, which is configured by max.poll.intervals from client
> side,
> > > and
> > > >> > using registration timeout to replace session timeout". Currently
> > the
> > > >> > default max.poll.interval.ms<
> > > >> >
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Feur04.safelinks.protection.outlook.com%2F%3Furl%3Dhttp%253A%252F%252Fmax.poll.interval.ms%26data%3D02%257C01%257C%257Cb48d52bf63324bd91a5208d64f43247d%257C84df9e7fe9f640afb435aaaaaaaaaaaa%257C1%257C0%257C636783547118328245%26sdata%3DJWiSn5gQO5VNrmBov0KBdHpyVb4CiA0pFOAtLAlFqqY%253D%26reserved%3D0&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=F11nFuanrUtwwp1YVwvmYDsuV0mIs6QKt%2Bf2gPxD2t8%3D&amp;reserved=0
> > > >> >
> > > >> > is configured to be 5 minutes and there will
> > > >> > be only one rebalance if all new consumers can join within 5
> > minutes.
> > > >> So it
> > > >> > is not clear whether we have a strong use-case for this new
> config.
> > > Can
> > > >> you
> > > >> > explain what is the benefit of introducing this new config?
> > > >> > Previously our goal is to use expansion timeout as a workaround
> for
> > > >> > triggering multiple
> > > >> > rebalances when scaling up members are not joining at the same
> time.
> > > It
> > > >> is
> > > >> > decided to
> > > >> > be addressed by client side protocol change, so we will not
> > introduce
> > > >> > expansion timeout.
> > > >> >
> > > >> > 8) It is mentioned that "To distinguish between previous version
> of
> > > >> > protocol, we will also increase the join group request version to
> v4
> > > >> when
> > > >> > MEMBER_NAME is set" and "If the broker version is not the latest
> (<
> > > v4),
> > > >> > the join group request shall be downgraded to v3 without setting
> the
> > > >> member
> > > >> > Id". It is probably simpler to just say that this feature is
> enabled
> > > if
> > > >> > JoinGroupRequest V4 is supported on both client and broker and
> > > >> MEMBER_NAME
> > > >> > is configured with non-empty string.
> > > >> > Yep, addressed this!
> > > >> >
> > > >> > 9) It is mentioned that broker may return
> NO_STATIC_MEMBER_INFO_SET
> > > >> error
> > > >> > in OffsetCommitResponse for "commit requests under static
> > membership".
> > > >> Can
> > > >> > you clarify how broker determines whether the commit request is
> > under
> > > >> > static membership?
> > > >> >
> > > >> > We have agreed that commit request shouldn't be affected by the
> new
> > > >> > membership, thus
> > > >> > removing it here. Thanks for catching this!
> > > >> >
> > > >> > Let me know if you have further suggestions or concerns. Thank you
> > for
> > > >> > your valuable feedback
> > > >> > to help me design the KIP better! (And I will try to address your
> > > >> > feedbacks in next round Mayuresh ??)
> > > >> >
> > > >> > Best,
> > > >> > Boyang
> > > >> > ________________________________
> > > >> > From: Mayuresh Gharat <gharatmayures...@gmail.com>
> > > >> > Sent: Wednesday, November 21, 2018 7:50 AM
> > > >> > To: dev@kafka.apache.org
> > > >> > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> rebalances
> > by
> > > >> > specifying member id
> > > >> >
> > > >> > Hi Boyang,
> > > >> >
> > > >> > Thanks for updating the KIP. This is a step good direction for
> > > stateful
> > > >> > applications and also mirroring applications whose latency is
> > affected
> > > >> due
> > > >> > to the rebalance issues that we have today.
> > > >> >
> > > >> > I had a few questions on the current version of the KIP :
> > > >> > For the effectiveness of the KIP, consumer with member.name set
> > will
> > > >> *not
> > > >> > send leave group request* when they go offline
> > > >> >
> > > >> > > By this you mean, even if the application has not called
> > > >> > > KafkaConsumer.poll() within session timeout, it will not be
> > sending
> > > >> the
> > > >> > > LeaveGroup request, right?
> > > >> > >
> > > >> >
> > > >> > Broker will maintain an in-memory mapping of {member.name ?
> > member.id
> > > }
> > > >> to
> > > >> > track member uniqueness.
> > > >> >
> > > >> > > When is the member.name removed from this map?
> > > >> > >
> > > >> >
> > > >> > Member.id must be set if the *member.name <
> > > >> >
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fmember.name&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=zSs67QfaXFLcI%2FiWTB3nfHuQLCHeZZUdz5hvD%2Brh5ug%3D&amp;reserved=0
> > > >> >
> > > >> > *is already
> > > >> > within the map. Otherwise reply MISSING_MEMBER_ID
> > > >> >
> > > >> > > How is this case handled on the client side? What is the
> > application
> > > >> that
> > > >> > > is using the KafkaConsumer suppose to do in this scenario?
> > > >> > >
> > > >> >
> > > >> > Session timeout is the timeout we will trigger rebalance when a
> > member
> > > >> goes
> > > >> > offline for too long (not sending heartbeat request). To make
> static
> > > >> > membership effective, we should increase the default max session
> > > >> timeout to
> > > >> > 30 min so that end user could config it freely.
> > > >> >
> > > >> > > This would mean that it might take more time to detect unowned
> > topic
> > > >> > > partitions and may cause delay for applications that perform
> data
> > > >> > mirroring
> > > >> > > tasks. I discussed this with our sre and we have a suggestion to
> > > make
> > > >> > here
> > > >> > > as listed below separately.
> > > >> > >
> > > >> >
> > > >> > Currently there is a config called *rebalance timeout* which is
> > > >> configured
> > > >> > by consumer *max.poll.intervals*. The reason we set it to poll
> > > interval
> > > >> is
> > > >> > because consumer could only send request within the call of poll()
> > and
> > > >> we
> > > >> > want to wait sufficient time for the join group request. When
> > reaching
> > > >> > rebalance timeout, the group will move towards completingRebalance
> > > stage
> > > >> > and remove unjoined groups
> > > >> >
> > > >> > > you meant remove unjoined members of the group, right ?
> > > >> > >
> > > >> >
> > > >> > Currently there is a config called *rebalance timeout* which is
> > > >> configured
> > > >> > by consumer *max.poll.intervals*. The reason we set it to poll
> > > interval
> > > >> is
> > > >> > because consumer could only send request within the call of poll()
> > and
> > > >> we
> > > >> > want to wait sufficient time for the join group request. When
> > reaching
> > > >> > rebalance timeout, the group will move towards completingRebalance
> > > stage
> > > >> > and remove unjoined groups. This is actually conflicting with the
> > > >> design of
> > > >> > static membership, because those temporarily unavailable members
> > will
> > > >> > potentially reattempt the join group and trigger extra rebalances.
> > > >> > Internally we would optimize this logic by having rebalance
> timeout
> > > >> only in
> > > >> > charge of stopping prepare rebalance stage, without removing
> > > >> non-responsive
> > > >> > members immediately.
> > > >> >
> > > >> > > What do you mean by " Internally we would optimize this logic by
> > > >> having
> > > >> > > rebalance timeout only in charge of stopping prepare rebalance
> > > stage,
> > > >> > > without removing non-responsive members immediately." There
> would
> > > not
> > > >> be
> > > >> > a
> > > >> > > full rebalance if the lagging consumer sent a JoinGroup request
> > > later,
> > > >> > > right ? If yes, can you highlight this in the KIP ?
> > > >> > >
> > > >> >
> > > >> > Scale Up
> > > >> >
> > > >> > > The KIP talks about scale up scenario but its not quite clear
> how
> > we
> > > >> > > handle it. Are we adding a separate "expansion.timeout" or we
> > adding
> > > >> > status
> > > >> > > "learner" ?. Can you shed more light on how this is handled in
> the
> > > >> KIP,
> > > >> > if
> > > >> > > its handled?
> > > >> > >
> > > >> >
> > > >> >
> > > >> > *Discussion*
> > > >> > Larger session timeouts causing latency rise for getting data for
> > > >> un-owned
> > > >> > topic partitions :
> > > >> >
> > > >> > > I think Jason had brought this up earlier about having a way to
> > say
> > > >> how
> > > >> > > many members/consumer hosts are you choosing to be in the
> consumer
> > > >> group.
> > > >> > > If we can do this, then in case of mirroring applications we can
> > do
> > > >> this
> > > >> > :
> > > >> > > Lets say we have a mirroring application that consumes from
> Kafka
> > > >> cluster
> > > >> > > A and produces to Kafka cluster B.
> > > >> > > Depending on the data and the Kafka cluster configuration, Kafka
> > > >> service
> > > >> > > providers can set a mirroring group saying that it will take,
> for
> > > >> example
> > > >> > > 300 consumer hosts/members to achieve the desired throughput and
> > > >> latency
> > > >> > > for mirroring and can have additional 10 consumer hosts as spare
> > in
> > > >> the
> > > >> > > same group.
> > > >> > > So when the first 300 members/consumers to join the group will
> > start
> > > >> > > mirroring the data from Kafka cluster A to Kafka cluster B.
> > > >> > > The remaining 10 consumer members can sit idle.
> > > >> > > The moment one of the consumer (for example: consumer number 54)
> > > from
> > > >> the
> > > >> > > first 300 members go out of the group (crossed session timeout),
> > it
> > > >> (the
> > > >> > > groupCoordinator) can just assign the topicPartitions from the
> > > >> consumer
> > > >> > > member 54 to one of the spare hosts.
> > > >> > > Once the consumer member 54 comes back up, it can start as
> being a
> > > >> part
> > > >> > of
> > > >> > > the spare pool.
> > > >> > > This enables us to have lower session timeouts and low latency
> > > >> mirroring,
> > > >> > > in cases where the service providers are OK with having spare
> > hosts.
> > > >> > > This would mean that we would tolerate n consumer members
> leaving
> > > and
> > > >> > > rejoining the group and still provide low latency as long as n
> <=
> > > >> number
> > > >> > of
> > > >> > > spare consumers.
> > > >> > > If there are no spare host available, we can get back to the
> idea
> > as
> > > >> > > described in the KIP.
> > > >> > >
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Mayuresh
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Tue, Nov 20, 2018 at 10:18 AM Konstantine Karantasis <
> > > >> > konstant...@confluent.io> wrote:
> > > >> >
> > > >> > > Hi Boyang.
> > > >> > >
> > > >> > > Thanks for preparing this KIP! It is making good progress and
> will
> > > be
> > > >> a
> > > >> > > great improvement for stateful Kafka applications.
> > > >> > >
> > > >> > > Apologies for my late reply, I was away for a while. Lots of
> great
> > > >> > comments
> > > >> > > so far, so I'll probably second most of them in what I suggest
> > below
> > > >> at
> > > >> > > this point.
> > > >> > >
> > > >> > > When I first read the KIP, I wanted to start at the end with
> > > something
> > > >> > that
> > > >> > > wasn't highlighted a lot. That was the topic related to handling
> > > >> > duplicate
> > > >> > > members. I see now that the initial suggestion of handling this
> > > >> situation
> > > >> > > during offset commit has been removed, and I agree with that.
> > Issues
> > > >> > > related to membership seem to be handled better when the member
> > > joins
> > > >> the
> > > >> > > group rather than when it tries to commit offsets. This also
> > > >> simplifies
> > > >> > how
> > > >> > > many request types need to change in order to incorporate the
> new
> > > >> member
> > > >> > > name field.
> > > >> > >
> > > >> > > I also agree with what Jason and Guozhang have said regarding
> > > >> timeouts.
> > > >> > > Although semantically, it's easier to think of every operation
> > > having
> > > >> its
> > > >> > > own timeout, operationally this can become a burden. Thus,
> > > >> consolidation
> > > >> > > seems preferable here. The definition of embedded protocols on
> top
> > > of
> > > >> the
> > > >> > > base group membership protocol for rebalancing gives enough
> > > >> flexibility
> > > >> > to
> > > >> > > address such needs in each client component separately.
> > > >> > >
> > > >> > > Finally, some minor comments:
> > > >> > > In a few places the new/proposed changes are referred to as
> > > "current".
> > > >> > > Which is a bit confusing considering that there is a protocol in
> > > place
> > > >> > > already, and by "current" someone might understand the existing
> > one.
> > > >> I'd
> > > >> > > recommend using new/proposed or equivalent when referring to
> > changes
> > > >> > > introduced with KIP-345 and current/existing or equivalent when
> > > >> referring
> > > >> > > to existing behavior.
> > > >> > >
> > > >> > > There's the following sentence in the "Public Interfaces"
> section:
> > > >> > > "Since for many stateful consumer/stream applications, the state
> > > >> > shuffling
> > > >> > > is more painful than short time partial unavailability."
> > > >> > > However, my understanding is that the changes proposed with
> > KIP-345
> > > >> will
> > > >> > > not exploit any partial availability. A suggestion for dealing
> > with
> > > >> > > temporary imbalances has been made in "Incremental Cooperative
> > > >> > Rebalancing"
> > > >> > > which can work well with KIP-345, but here I don't see proposed
> > > >> changes
> > > >> > > that suggest that some resources (e.g. partitions) will keep
> being
> > > >> used
> > > >> > > while others will not be utilized. Thus, you might want to
> adjust
> > > this
> > > >> > > sentence. Correct me if I'm missing something related to that.
> > > >> > >
> > > >> > > In the rejected alternatives, under point 2) I read "we can copy
> > the
> > > >> > member
> > > >> > > id to the config files". I believe it means to say "member name"
> > > >> unless
> > > >> > I'm
> > > >> > > missing something about reusing member ids. Also below I read:
> "By
> > > >> > allowing
> > > >> > > consumers to optionally specifying a member id" which probably
> > > implies
> > > >> > > "member name" again. In a sense this section highlights a
> > potential
> > > >> > > confusion between member name and member id. I wonder if we
> could
> > > >> come up
> > > >> > > with a better term for the new field. StaticTag, StaticLabel, or
> > > even
> > > >> > > StaticName are some suggestions that could potentially help with
> > > >> > confusion
> > > >> > > between MemberId and MemberName and what corresponds to what.
> But
> > I
> > > >> > > wouldn't like to disrupt the discussion with naming conventions
> > too
> > > >> much
> > > >> > at
> > > >> > > this point. I just mention it here as a thought.
> > > >> > >
> > > >> > > Looking forward to see the final details of this KIP. Great work
> > so
> > > >> far!
> > > >> > >
> > > >> > > Konstantine
> > > >> > >
> > > >> > >
> > > >> > > On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <
> bche...@outlook.com>
> > > >> wrote:
> > > >> > >
> > > >> > > > Thanks Guozhang for the great summary here, and I have been
> > > >> following
> > > >> > up
> > > >> > > > the action items here.
> > > >> > > >
> > > >> > > >
> > > >> > > >   1.  I already updated the KIP to remove the expansion
> timeout
> > > and
> > > >> > > > registration timeout. Great to see them being addressed in
> > client
> > > >> side!
> > > >> > > >   2.  I double checked the design and I believe that it is ok
> to
> > > >> have
> > > >> > > both
> > > >> > > > static member and dynamic member co-exist in the same group.
> So
> > > the
> > > >> > > upgrade
> > > >> > > > shouldn't be destructive and we are removing the two
> membership
> > > >> > protocol
> > > >> > > > switching APIs.
> > > >> > > >   3.  I only have question about this one. I'm still reading
> the
> > > >> > > KafkaApis
> > > >> > > > code here. Should I just use the same authorization logic for
> > > >> > > > ForceStaticRebalanceRequest as JoinGroupRequest?
> > > >> > > >   4.  I'm very excited to see this work with K8! Like you
> > > suggested,
> > > >> > this
> > > >> > > > feature could be better addressed in a separate KIP because it
> > is
> > > >> > pretty
> > > >> > > > independent. I could start drafting the KIP once the current
> > > >> proposal
> > > >> > is
> > > >> > > > approved.
> > > >> > > >   5.  I believe that we don't need fencing in offset commit
> > > request,
> > > >> > > since
> > > >> > > > duplicate member.name issue could be handled by join group
> > > >> request. We
> > > >> > > > shall reject join group with known member name but no member
> id
> > > >> (which
> > > >> > > > means we already have an active member using this identity).
> > > >> > > >   6.  I agree to remove that internal config once we move
> > forward
> > > >> with
> > > >> > > > static membership. And I already removed the entire section
> from
> > > the
> > > >> > KIP.
> > > >> > > >
> > > >> > > > Let me know if you have other concerns.
> > > >> > > >
> > > >> > > > Best,
> > > >> > > > Boyang
> > > >> > > > ________________________________
> > > >> > > > From: Guozhang Wang <wangg...@gmail.com>
> > > >> > > > Sent: Tuesday, November 20, 2018 4:21 PM
> > > >> > > > To: dev
> > > >> > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer
> > > rebalances
> > > >> by
> > > >> > > > specifying member id
> > > >> > > >
> > > >> > > > Hello Boyang,
> > > >> > > >
> > > >> > > > Thanks a lot for the KIP! It is a great write-up and I
> > appreciate
> > > >> your
> > > >> > > > patience answering to the feedbacks from the community. I'd
> like
> > > to
> > > >> add
> > > >> > > my
> > > >> > > > 2cents here:
> > > >> > > >
> > > >> > > > 1. By introducing another two timeout configs,
> > > registration_timeout
> > > >> and
> > > >> > > > expansion_timeout, we are effectively having four timeout
> > configs:
> > > >> > > session
> > > >> > > > timeout, rebalance timeout (configured as "
> max.poll.interval.ms
> > "
> > > on
> > > >> > > client
> > > >> > > > side), and these two. Interplaying these timeout configs can
> be
> > > >> quite
> > > >> > > hard
> > > >> > > > for users with such complexity, and hence I'm wondering if we
> > can
> > > >> > > simplify
> > > >> > > > the situation with as less possible timeout configs as
> possible.
> > > >> Here
> > > >> > is
> > > >> > > a
> > > >> > > > concrete suggestion I'd like propose:
> > > >> > > >
> > > >> > > > 1.a) Instead of introducing a registration_timeout in addition
> > to
> > > >> the
> > > >> > > > session_timeout for static members, we can just reuse the
> > > >> > session_timeout
> > > >> > > > and ask users to set it to a larger value when they are
> > upgrading
> > > a
> > > >> > > dynamic
> > > >> > > > client to a static client by setting the "member.name" at the
> > > same
> > > >> > time.
> > > >> > > > By
> > > >> > > > default, the broker-side min.session.timeout is 6 seconds and
> > > >> > > > max.session.timeout is 5 minutes, which seems reasonable to me
> > (we
> > > >> can
> > > >> > of
> > > >> > > > course modify this broker config to enlarge the valid interval
> > if
> > > we
> > > >> > want
> > > >> > > > in practice). And then we should also consider removing the
> > > >> condition
> > > >> > for
> > > >> > > > marking a client as failed if the rebalance timeout has
> reached
> > > >> while
> > > >> > the
> > > >> > > > JoinGroup was not received, so that the semantics of
> > > session_timeout
> > > >> > and
> > > >> > > > rebalance_timeout are totally separated: the former is only
> used
> > > to
> > > >> > > > determine if a consumer member of the group should be marked
> as
> > > >> failed
> > > >> > > and
> > > >> > > > kicked out of the group, and the latter is only used to
> > determine
> > > >> the
> > > >> > > > longest time coordinator should wait for PREPARE_REBALANCE
> > phase.
> > > In
> > > >> > > other
> > > >> > > > words if a member did not send the JoinGroup in time of the
> > > >> > > > rebalance_timeout, we still include it in the new generation
> of
> > > the
> > > >> > group
> > > >> > > > and use its old subscription info to send to leader for
> > > assignment.
> > > >> > Later
> > > >> > > > if the member came back with HeartBeat request, we can still
> > > follow
> > > >> the
> > > >> > > > normal path to bring it to the latest generation while
> checking
> > > that
> > > >> > its
> > > >> > > > sent JoinGroup request contains the same subscription info as
> we
> > > >> used
> > > >> > to
> > > >> > > > assign the partitions previously (which should be likely the
> > case
> > > in
> > > >> > > > practice). In addition, we should let static members to not
> send
> > > the
> > > >> > > > LeaveGroup request when it is gracefully shutdown, so that a
> > > static
> > > >> > > member
> > > >> > > > can only be leaving the group if its session has timed out, OR
> > it
> > > >> has
> > > >> > > been
> > > >> > > > indicated to not exist in the group any more (details below).
> > > >> > > >
> > > >> > > > 1.b) We have a parallel discussion about Incremental
> Cooperative
> > > >> > > > Rebalancing, in which we will encode the "when to rebalance"
> > logic
> > > >> at
> > > >> > the
> > > >> > > > application level, instead of at the protocol level. By doing
> > this
> > > >> we
> > > >> > can
> > > >> > > > also enable a few other optimizations, e.g. at the Streams
> level
> > > to
> > > >> > first
> > > >> > > > build up the state store as standby tasks and then trigger a
> > > second
> > > >> > > > rebalance to actually migrate the active tasks while keeping
> the
> > > >> actual
> > > >> > > > rebalance latency and hence unavailability window to be small
> (
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=OAmsz8pz4JW%2BayjLqwk04E16G%2FTCF%2BbVk0LNB%2BUJgeY%3D&amp;reserved=0
> > > >> > > ).
> > > >> > > > I'd propose we align
> > > >> > > > KIP-345 along with this idea, and hence do not add the
> > > >> > expansion_timeout
> > > >> > > as
> > > >> > > > part of the protocol layer, but only do that at the
> > application's
> > > >> > > > coordinator / assignor layer (Connect, Streams, etc). We can
> > > still,
> > > >> > > > deprecate the "*group.initial.rebalance.delay.ms
> > > >> > > > <
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&amp;data=02%7C01%7C%7Cfa7f68b0cbc94390f04b08d653fa2832%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636788731208857126&amp;sdata=S8Ds6UNM56%2B3rq%2F%2BvYwKdMmzXrH1B5D3ghyjm06hu4g%3D&amp;reserved=0
> > > >> > > >*"
> > > >> > > > though as part of this KIP
> > > >> > > > since we have discussed about its limit and think it is
> actually
> > > >> not a
> > > >> > > very
> > > >> > > > good design and could be replaced with client-side logic
> above.
> > > >> > > >
> > > >> > > >
> > > >> > > > 2. I'd like to see your thoughts on the upgrade path for this
> > KIP.
> > > >> More
> > > >> > > > specifically, let's say after we have upgraded broker version
> to
> > > be
> > > >> > able
> > > >> > > to
> > > >> > > > recognize the new versions of JoinGroup request and the admin
> > > >> requests,
> > > >> > > how
> > > >> > > > should we upgrade the clients and enable static groups? On top
> > of
> > > my
> > > >> > head
> > > >> > > > if we do a rolling bounce in which we set the member.name
> > config
> > > as
> > > >> > well
> > > >> > > > as
> > > >> > > > optionally increase the session.timeout config when we bounce
> > each
> > > >> > > > instance, then during this rolling bounces we will have a
> group
> > > >> > contained
> > > >> > > > with both dynamic members and static members. It means that we
> > > >> should
> > > >> > > have
> > > >> > > > the group to allow such scenario (i.e. we cannot reject
> > JoinGroup
> > > >> > > requests
> > > >> > > > from dynamic members), and hence the "member.name" -> "
> > member.id"
> > > >> > > mapping
> > > >> > > > will only be partial at this scenario. Also could you describe
> > if
> > > >> the
> > > >> > > > upgrade to the first version that support this feature would
> > ever
> > > >> get
> > > >> > any
> > > >> > > > benefits, or only the future upgrade path for rolling bounces
> > > could
> > > >> get
> > > >> > > > benefits out of this feature?
> > > >> > > >
> > > >> > > > If that's the case and we will do 1) as suggested above, do we
> > > still
> > > >> > need
> > > >> > > > the enableStaticMembership and enableDynamicMembership admin
> > > >> requests
> > > >> > any
> > > >> > > > more? Seems it is not necessary any more as we will only have
> > the
> > > >> > notion
> > > >> > > of
> > > >> > > > "dynamic or static members" that can co-exist in a group while
> > > >> there no
> > > >> > > > notion of "dynamic or static groups", and hence these two
> > requests
> > > >> are
> > > >> > > not
> > > >> > > > needed anymore.
> > > >> > > >
> > > >> > > >
> > > >> > > > 3. We need to briefly talk about the implications for ACL as
> we
> > > >> > introduce
> > > >> > > > new admin requests that are related to a specific group.id.
> For
> > > >> > example,
> > > >> > > > we
> > > >> > > > need to make sure that whoever created the group or joined the
> > > group
> > > >> > can
> > > >> > > > actually send admin requests for the group, otherwise the
> > > >> application
> > > >> > > > owners need to bother the Kafka operators on a multi-tenant
> > > cluster
> > > >> > every
> > > >> > > > time they want to send any admin requests for their groups
> which
> > > >> would
> > > >> > be
> > > >> > > > an operational nightmare.
> > > >> > > >
> > > >> > > >
> > > >> > > > 4. I like Jason's suggestion of adding an optional field for
> the
> > > >> list
> > > >> > of
> > > >> > > > member names, and I'm wondering if that can be done as part of
> > the
> > > >> > > > forceStaticRebalance request: i.e. by passing a list of
> members,
> > > we
> > > >> > will
> > > >> > > > enforce a rebalance immediately since it indicates that some
> > > static
> > > >> > > member
> > > >> > > > will be officially kicked out of the group and some new static
> > > >> members
> > > >> > > may
> > > >> > > > be added. So back to 1.a) above, a static member can only be
> > > kicked
> > > >> out
> > > >> > > of
> > > >> > > > the group if a) its session (arguably long period of time) has
> > > timed
> > > >> > out,
> > > >> > > > and b) this admin request explicitly state that it is no
> longer
> > > >> part of
> > > >> > > the
> > > >> > > > group. As for execution I'm fine with keeping it as a future
> > work
> > > of
> > > >> > this
> > > >> > > > KIP if you'd like to make its scope smaller.
> > > >> > > >
> > > >> > > > Following are minor comments:
> > > >> > > >
> > > >> > > > 5. I'm not sure if we need to include "member.name" as part
> of
> > > the
> > > >> > > > OffsetCommitRequest for fencing purposes, as I think the
> > memberId
> > > >> plus
> > > >> > > the
> > > >> > > > generation number should be sufficient for fencing even with
> > > static
> > > >> > > > members.
> > > >> > > >
> > > >> > > > 6. As mentioned above, if we agree to do 1) we can get rid of
> > the
> > > "
> > > >> > > > LEAVE_GROUP_ON_CLOSE_CONFIG" config.
> > > >> > > >
> > > >> > > >
> > > >> > > > Guozhang
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <lindon...@gmail.com
> >
> > > >> wrote:
> > > >> > > >
> > > >> > > > > Hey Boyang,
> > > >> > > > >
> > > >> > > > > Thanks for the proposal! This is very useful. I have some
> > > comments
> > > >> > > below:
> > > >> > > > >
> > > >> > > > > 1) The motivation currently explicitly states that the goal
> is
> > > to
> > > >> > > improve
> > > >> > > > > performance for heavy state application. It seems that the
> > > >> motivation
> > > >> > > can
> > > >> > > > > be stronger with the following use-case. Currently for
> > > MirrorMaker
> > > >> > > > cluster
> > > >> > > > > with e.g. 100 MirrorMaker processes, it will take a long
> time
> > to
> > > >> > > rolling
> > > >> > > > > bounce the entire MirrorMaker cluster. Each MirrorMaker
> > process
> > > >> > restart
> > > >> > > > > will trigger a rebalance which currently pause the
> consumption
> > > of
> > > >> the
> > > >> > > all
> > > >> > > > > partitions of the MirrorMaker cluster. With the change
> stated
> > in
> > > >> this
> > > >> > > > > patch, as long as a MirrorMaker can restart within the
> > specified
> > > >> > > timeout
> > > >> > > > > (e.g. 2 minutes), then we only need constant number of
> > rebalance
> > > >> > (e.g.
> > > >> > > > for
> > > >> > > > > leader restart) for the entire rolling bounce, which will
> > > >> > significantly
> > > >> > > > > improves the availability of the MirrorMaker pipeline. In my
> > > >> opinion,
> > > >> > > the
> > > >> > > > > main benefit of the KIP is to avoid unnecessary rebalance if
> > the
> > > >> > > consumer
> > > >> > > > > process can be restarted within soon, which helps
> performance
> > > >> even if
> > > >> > > > > overhead of state shuffling for a given process is small.
> > > >> > > > >
> > > >> > > > > 2) In order to simplify the KIP reading, can you follow the
> > > >> writeup
> > > >> > > style
> > > >> > > > > of other KIP (e.g. KIP-98) and list the interface change
> such
> > as
> > > >> new
> > > >> > > > > configs (e.g. registration timeout), new request/response,
> new
> > > >> > > > AdminClient
> > > >> > > > > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)?
> > Currently
> > > >> some
> > > >> > > of
> > > >> > > > > these are specified in the Proposed Change section which
> makes
> > > it
> > > >> a
> > > >> > bit
> > > >> > > > > inconvenient to understand the new interface that will be
> > > exposed
> > > >> to
> > > >> > > > user.
> > > >> > > > > Explanation of the current two-phase rebalance protocol
> > probably
> > > >> can
> > > >> > be
> > > >> > > > > moved out of public interface section.
> > > >> > > > >
> > > >> > > > > 3) There are currently two version of JoinGroupRequest in
> the
> > > KIP
> > > >> and
> > > >> > > > only
> > > >> > > > > one of them has field memberId. This seems confusing.
> > > >> > > > >
> > > >> > > > > 4) It is mentioned in the KIP that "An admin API to force
> > > >> rebalance
> > > >> > > could
> > > >> > > > > be helpful here, but we will make a call once we finished
> the
> > > >> major
> > > >> > > > > implementation". So this seems to be still an open question
> in
> > > the
> > > >> > > > current
> > > >> > > > > design. We probably want to agree on this before voting for
> > the
> > > >> KIP.
> > > >> > > > >
> > > >> > > > > 5) The KIP currently adds new config MEMBER_NAME for
> consumer.
> > > Can
> > > >> > you
> > > >> > > > > specify the name of the config key and the default config
> > value?
> > > >> > > Possible
> > > >> > > > > default values include empty string or null (similar to
> > > >> > transaction.id
> > > >> > > > in
> > > >> > > > > producer config).
> > > >> > > > >
> > > >> > > > > 6) Regarding the use of the topic "static_member_map" to
> > persist
> > > >> > member
> > > >> > > > > name map, currently if consumer coordinator broker goes
> > offline,
> > > >> > > > rebalance
> > > >> > > > > is triggered and consumers will try connect to the new
> > > >> coordinator.
> > > >> > If
> > > >> > > > > these consumers can connect to the new coordinator within
> > > >> > > > > max.poll.interval.ms which by default is 5 minutes, given
> > that
> > > >> > broker
> > > >> > > > can
> > > >> > > > > use a deterministic algorithm to determine the partition ->
> > > >> > member_name
> > > >> > > > > mapping, each consumer should get assigned the same set of
> > > >> partitions
> > > >> > > > > without requiring state shuffling. So it is not clear
> whether
> > we
> > > >> > have a
> > > >> > > > > strong use-case for this new logic. Can you help clarify
> what
> > is
> > > >> the
> > > >> > > > > benefit of using topic "static_member_map" to persist member
> > > name
> > > >> > map?
> > > >> > > > >
> > > >> > > > > 7) Regarding the introduction of the expensionTimeoutMs
> > config,
> > > >> it is
> > > >> > > > > mentioned that "we are using expansion timeout to replace
> > > >> rebalance
> > > >> > > > > timeout, which is configured by max.poll.intervals from
> client
> > > >> side,
> > > >> > > and
> > > >> > > > > using registration timeout to replace session timeout".
> > > Currently
> > > >> the
> > > >> > > > > default max.poll.interval.ms is configured to be 5 minutes
> > and
> > > >> there
> > > >> > > > will
> > > >> > > > > be only one rebalance if all new consumers can join within 5
> > > >> minutes.
> > > >> > > So
> > > >> > > > it
> > > >> > > > > is not clear whether we have a strong use-case for this new
> > > >> config.
> > > >> > Can
> > > >> > > > you
> > > >> > > > > explain what is the benefit of introducing this new config?
> > > >> > > > >
> > > >> > > > > 8) It is mentioned that "To distinguish between previous
> > version
> > > >> of
> > > >> > > > > protocol, we will also increase the join group request
> version
> > > to
> > > >> v4
> > > >> > > when
> > > >> > > > > MEMBER_NAME is set" and "If the broker version is not the
> > latest
> > > >> (<
> > > >> > > v4),
> > > >> > > > > the join group request shall be downgraded to v3 without
> > setting
> > > >> the
> > > >> > > > member
> > > >> > > > > Id". It is probably simpler to just say that this feature is
> > > >> enabled
> > > >> > if
> > > >> > > > > JoinGroupRequest V4 is supported on both client and broker
> and
> > > >> > > > MEMBER_NAME
> > > >> > > > > is configured with non-empty string.
> > > >> > > > >
> > > >> > > > > 9) It is mentioned that broker may return
> > > >> NO_STATIC_MEMBER_INFO_SET
> > > >> > > error
> > > >> > > > > in OffsetCommitResponse for "commit requests under static
> > > >> > membership".
> > > >> > > > Can
> > > >> > > > > you clarify how broker determines whether the commit request
> > is
> > > >> under
> > > >> > > > > static membership?
> > > >> > > > >
> > > >> > > > > Thanks,
> > > >> > > > > Dong
> > > >> > > > >
> > > >> > > >
> > > >> > > >
> > > >> > > > --
> > > >> > > > -- Guozhang
> > > >> > > >
> > > >> > >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -Regards,
> > > >> > Mayuresh R. Gharat
> > > >> > (862) 250-7125
> > > >> >
> > > >>
> > > >
> > > >
> > > > --
> > > > -Regards,
> > > > Mayuresh R. Gharat
> > > > (862) 250-7125
> > > >
> > >
> > >
> > > --
> > > -Regards,
> > > Mayuresh R. Gharat
> > > (862) 250-7125
> > >
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


-- 
-- Guozhang

Reply via email to