Thanks Konstantine for correcting the details within proposal for me! Addressed 
them below:


> In a few places the new/proposed changes are referred to as "current".
> Which is a bit confusing considering that there is a protocol in place
> already, and by "current" someone might understand the existing one.

Fixed the `current` usage with `proposed`!

> There's the following sentence in the "Public Interfaces" section:
>"Since for many stateful consumer/stream applications, the state shuffling
> is more painful than short time partial unavailability."
> However, my understanding is that the changes proposed with KIP-345 will
> not exploit any partial availability.
We are proposing to extend session timeout inside static membership and change 
rebalance timeout not to remove unjoined members, which means we would detect a 
consumer failure slower from broker side perspective. This is what I mean by 
"partial unavailability": some topic partition is not making progress due to 
consumer dead/hanging.

> In the rejected alternatives, under point 2) I read "we can copy the member
> id to the config files". I believe it means to say "member name" unless I'm
> missing something about reusing member ids....
Updated that section!

Let me know if this makes sense to you!

Boyang
________________________________
From: Konstantine Karantasis <konstant...@confluent.io>
Sent: Wednesday, November 21, 2018 2:18 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by 
specifying member id

Hi Boyang.

Thanks for preparing this KIP! It is making good progress and will be a
great improvement for stateful Kafka applications.

Apologies for my late reply, I was away for a while. Lots of great comments
so far, so I'll probably second most of them in what I suggest below at
this point.

When I first read the KIP, I wanted to start at the end with something that
wasn't highlighted a lot. That was the topic related to handling duplicate
members. I see now that the initial suggestion of handling this situation
during offset commit has been removed, and I agree with that. Issues
related to membership seem to be handled better when the member joins the
group rather than when it tries to commit offsets. This also simplifies how
many request types need to change in order to incorporate the new member
name field.

I also agree with what Jason and Guozhang have said regarding timeouts.
Although semantically, it's easier to think of every operation having its
own timeout, operationally this can become a burden. Thus, consolidation
seems preferable here. The definition of embedded protocols on top of the
base group membership protocol for rebalancing gives enough flexibility to
address such needs in each client component separately.

Finally, some minor comments:
In a few places the new/proposed changes are referred to as "current".
Which is a bit confusing considering that there is a protocol in place
already, and by "current" someone might understand the existing one. I'd
recommend using new/proposed or equivalent when referring to changes
introduced with KIP-345 and current/existing or equivalent when referring
to existing behavior.

There's the following sentence in the "Public Interfaces" section:
"Since for many stateful consumer/stream applications, the state shuffling
is more painful than short time partial unavailability."
However, my understanding is that the changes proposed with KIP-345 will
not exploit any partial availability. A suggestion for dealing with
temporary imbalances has been made in "Incremental Cooperative Rebalancing"
which can work well with KIP-345, but here I don't see proposed changes
that suggest that some resources (e.g. partitions) will keep being used
while others will not be utilized. Thus, you might want to adjust this
sentence. Correct me if I'm missing something related to that.

In the rejected alternatives, under point 2) I read "we can copy the member
id to the config files". I believe it means to say "member name" unless I'm
missing something about reusing member ids. Also below I read: "By allowing
consumers to optionally specifying a member id" which probably implies
"member name" again. In a sense this section highlights a potential
confusion between member name and member id. I wonder if we could come up
with a better term for the new field. StaticTag, StaticLabel, or even
StaticName are some suggestions that could potentially help with confusion
between MemberId and MemberName and what corresponds to what. But I
wouldn't like to disrupt the discussion with naming conventions too much at
this point. I just mention it here as a thought.

Looking forward to see the final details of this KIP. Great work so far!

Konstantine


On Tue, Nov 20, 2018 at 4:23 AM Boyang Chen <bche...@outlook.com> wrote:

> Thanks Guozhang for the great summary here, and I have been following up
> the action items here.
>
>
>   1.  I already updated the KIP to remove the expansion timeout and
> registration timeout. Great to see them being addressed in client side!
>   2.  I double checked the design and I believe that it is ok to have both
> static member and dynamic member co-exist in the same group. So the upgrade
> shouldn't be destructive and we are removing the two membership protocol
> switching APIs.
>   3.  I only have question about this one. I'm still reading the KafkaApis
> code here. Should I just use the same authorization logic for
> ForceStaticRebalanceRequest as JoinGroupRequest?
>   4.  I'm very excited to see this work with K8! Like you suggested, this
> feature could be better addressed in a separate KIP because it is pretty
> independent. I could start drafting the KIP once the current proposal is
> approved.
>   5.  I believe that we don't need fencing in offset commit request, since
> duplicate member.name issue could be handled by join group request. We
> shall reject join group with known member name but no member id (which
> means we already have an active member using this identity).
>   6.  I agree to remove that internal config once we move forward with
> static membership. And I already removed the entire section from the KIP.
>
> Let me know if you have other concerns.
>
> Best,
> Boyang
> ________________________________
> From: Guozhang Wang <wangg...@gmail.com>
> Sent: Tuesday, November 20, 2018 4:21 PM
> To: dev
> Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer rebalances by
> specifying member id
>
> Hello Boyang,
>
> Thanks a lot for the KIP! It is a great write-up and I appreciate your
> patience answering to the feedbacks from the community. I'd like to add my
> 2cents here:
>
> 1. By introducing another two timeout configs, registration_timeout and
> expansion_timeout, we are effectively having four timeout configs: session
> timeout, rebalance timeout (configured as "max.poll.interval.ms" on client
> side), and these two. Interplaying these timeout configs can be quite hard
> for users with such complexity, and hence I'm wondering if we can simplify
> the situation with as less possible timeout configs as possible. Here is a
> concrete suggestion I'd like propose:
>
> 1.a) Instead of introducing a registration_timeout in addition to the
> session_timeout for static members, we can just reuse the session_timeout
> and ask users to set it to a larger value when they are upgrading a dynamic
> client to a static client by setting the "member.name" at the same time.
> By
> default, the broker-side min.session.timeout is 6 seconds and
> max.session.timeout is 5 minutes, which seems reasonable to me (we can of
> course modify this broker config to enlarge the valid interval if we want
> in practice). And then we should also consider removing the condition for
> marking a client as failed if the rebalance timeout has reached while the
> JoinGroup was not received, so that the semantics of session_timeout and
> rebalance_timeout are totally separated: the former is only used to
> determine if a consumer member of the group should be marked as failed and
> kicked out of the group, and the latter is only used to determine the
> longest time coordinator should wait for PREPARE_REBALANCE phase. In other
> words if a member did not send the JoinGroup in time of the
> rebalance_timeout, we still include it in the new generation of the group
> and use its old subscription info to send to leader for assignment. Later
> if the member came back with HeartBeat request, we can still follow the
> normal path to bring it to the latest generation while checking that its
> sent JoinGroup request contains the same subscription info as we used to
> assign the partitions previously (which should be likely the case in
> practice). In addition, we should let static members to not send the
> LeaveGroup request when it is gracefully shutdown, so that a static member
> can only be leaving the group if its session has timed out, OR it has been
> indicated to not exist in the group any more (details below).
>
> 1.b) We have a parallel discussion about Incremental Cooperative
> Rebalancing, in which we will encode the "when to rebalance" logic at the
> application level, instead of at the protocol level. By doing this we can
> also enable a few other optimizations, e.g. at the Streams level to first
> build up the state store as standby tasks and then trigger a second
> rebalance to actually migrate the active tasks while keeping the actual
> rebalance latency and hence unavailability window to be small (
>
> https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FKAFKA-6145&amp;data=02%7C01%7C%7Cbc48a38deb8649da89c908d64f14a464%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636783347401510109&amp;sdata=bpN0Pb36dfXdlQn3OqAYsiXEnIJDyIwm%2BO%2FqrQ3%2BAJw%3D&amp;reserved=0).
> I'd propose we align
> KIP-345 along with this idea, and hence do not add the expansion_timeout as
> part of the protocol layer, but only do that at the application's
> coordinator / assignor layer (Connect, Streams, etc). We can still,
> deprecate the "*group.initial.rebalance.delay.ms
> <
> https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Fgroup.initial.rebalance.delay.ms&amp;data=02%7C01%7C%7Cbc48a38deb8649da89c908d64f14a464%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C636783347401510109&amp;sdata=0IKKSRgDlK%2FoEmMiyv502DewhN4vButXpVNwwHgMyBI%3D&amp;reserved=0>*"
> though as part of this KIP
> since we have discussed about its limit and think it is actually not a very
> good design and could be replaced with client-side logic above.
>
>
> 2. I'd like to see your thoughts on the upgrade path for this KIP. More
> specifically, let's say after we have upgraded broker version to be able to
> recognize the new versions of JoinGroup request and the admin requests, how
> should we upgrade the clients and enable static groups? On top of my head
> if we do a rolling bounce in which we set the member.name config as well
> as
> optionally increase the session.timeout config when we bounce each
> instance, then during this rolling bounces we will have a group contained
> with both dynamic members and static members. It means that we should have
> the group to allow such scenario (i.e. we cannot reject JoinGroup requests
> from dynamic members), and hence the "member.name" -> "member.id" mapping
> will only be partial at this scenario. Also could you describe if the
> upgrade to the first version that support this feature would ever get any
> benefits, or only the future upgrade path for rolling bounces could get
> benefits out of this feature?
>
> If that's the case and we will do 1) as suggested above, do we still need
> the enableStaticMembership and enableDynamicMembership admin requests any
> more? Seems it is not necessary any more as we will only have the notion of
> "dynamic or static members" that can co-exist in a group while there no
> notion of "dynamic or static groups", and hence these two requests are not
> needed anymore.
>
>
> 3. We need to briefly talk about the implications for ACL as we introduce
> new admin requests that are related to a specific group.id. For example,
> we
> need to make sure that whoever created the group or joined the group can
> actually send admin requests for the group, otherwise the application
> owners need to bother the Kafka operators on a multi-tenant cluster every
> time they want to send any admin requests for their groups which would be
> an operational nightmare.
>
>
> 4. I like Jason's suggestion of adding an optional field for the list of
> member names, and I'm wondering if that can be done as part of the
> forceStaticRebalance request: i.e. by passing a list of members, we will
> enforce a rebalance immediately since it indicates that some static member
> will be officially kicked out of the group and some new static members may
> be added. So back to 1.a) above, a static member can only be kicked out of
> the group if a) its session (arguably long period of time) has timed out,
> and b) this admin request explicitly state that it is no longer part of the
> group. As for execution I'm fine with keeping it as a future work of this
> KIP if you'd like to make its scope smaller.
>
> Following are minor comments:
>
> 5. I'm not sure if we need to include "member.name" as part of the
> OffsetCommitRequest for fencing purposes, as I think the memberId plus the
> generation number should be sufficient for fencing even with static
> members.
>
> 6. As mentioned above, if we agree to do 1) we can get rid of the "
> LEAVE_GROUP_ON_CLOSE_CONFIG" config.
>
>
> Guozhang
>
>
>
>
> On Sat, Nov 17, 2018 at 5:53 PM Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Boyang,
> >
> > Thanks for the proposal! This is very useful. I have some comments below:
> >
> > 1) The motivation currently explicitly states that the goal is to improve
> > performance for heavy state application. It seems that the motivation can
> > be stronger with the following use-case. Currently for MirrorMaker
> cluster
> > with e.g. 100 MirrorMaker processes, it will take a long time to rolling
> > bounce the entire MirrorMaker cluster. Each MirrorMaker process restart
> > will trigger a rebalance which currently pause the consumption of the all
> > partitions of the MirrorMaker cluster. With the change stated in this
> > patch, as long as a MirrorMaker can restart within the specified timeout
> > (e.g. 2 minutes), then we only need constant number of rebalance (e.g.
> for
> > leader restart) for the entire rolling bounce, which will significantly
> > improves the availability of the MirrorMaker pipeline. In my opinion, the
> > main benefit of the KIP is to avoid unnecessary rebalance if the consumer
> > process can be restarted within soon, which helps performance even if
> > overhead of state shuffling for a given process is small.
> >
> > 2) In order to simplify the KIP reading, can you follow the writeup style
> > of other KIP (e.g. KIP-98) and list the interface change such as new
> > configs (e.g. registration timeout), new request/response, new
> AdminClient
> > API and new error code (e.g. DUPLICATE_STATIC_MEMBER)? Currently some of
> > these are specified in the Proposed Change section which makes it a bit
> > inconvenient to understand the new interface that will be exposed to
> user.
> > Explanation of the current two-phase rebalance protocol probably can be
> > moved out of public interface section.
> >
> > 3) There are currently two version of JoinGroupRequest in the KIP and
> only
> > one of them has field memberId. This seems confusing.
> >
> > 4) It is mentioned in the KIP that "An admin API to force rebalance could
> > be helpful here, but we will make a call once we finished the major
> > implementation". So this seems to be still an open question in the
> current
> > design. We probably want to agree on this before voting for the KIP.
> >
> > 5) The KIP currently adds new config MEMBER_NAME for consumer. Can you
> > specify the name of the config key and the default config value? Possible
> > default values include empty string or null (similar to transaction.id
> in
> > producer config).
> >
> > 6) Regarding the use of the topic "static_member_map" to persist member
> > name map, currently if consumer coordinator broker goes offline,
> rebalance
> > is triggered and consumers will try connect to the new coordinator. If
> > these consumers can connect to the new coordinator within
> > max.poll.interval.ms which by default is 5 minutes, given that broker
> can
> > use a deterministic algorithm to determine the partition -> member_name
> > mapping, each consumer should get assigned the same set of partitions
> > without requiring state shuffling. So it is not clear whether we have a
> > strong use-case for this new logic. Can you help clarify what is the
> > benefit of using topic "static_member_map" to persist member name map?
> >
> > 7) Regarding the introduction of the expensionTimeoutMs config, it is
> > mentioned that "we are using expansion timeout to replace rebalance
> > timeout, which is configured by max.poll.intervals from client side, and
> > using registration timeout to replace session timeout". Currently the
> > default max.poll.interval.ms is configured to be 5 minutes and there
> will
> > be only one rebalance if all new consumers can join within 5 minutes. So
> it
> > is not clear whether we have a strong use-case for this new config. Can
> you
> > explain what is the benefit of introducing this new config?
> >
> > 8) It is mentioned that "To distinguish between previous version of
> > protocol, we will also increase the join group request version to v4 when
> > MEMBER_NAME is set" and "If the broker version is not the latest (< v4),
> > the join group request shall be downgraded to v3 without setting the
> member
> > Id". It is probably simpler to just say that this feature is enabled if
> > JoinGroupRequest V4 is supported on both client and broker and
> MEMBER_NAME
> > is configured with non-empty string.
> >
> > 9) It is mentioned that broker may return NO_STATIC_MEMBER_INFO_SET error
> > in OffsetCommitResponse for "commit requests under static membership".
> Can
> > you clarify how broker determines whether the commit request is under
> > static membership?
> >
> > Thanks,
> > Dong
> >
>
>
> --
> -- Guozhang
>

Reply via email to