@Jason, Good point about disconnects. And with that I think I agree that a registry id maybe a better idea to enable fencing than validating on host / port.
Guozhang On Mon, Jul 30, 2018 at 5:40 PM, Jason Gustafson <ja...@confluent.io> wrote: > Hey Guozhang, > > Thanks for the detailed response. Really quick about the fencing issue, I > think host/port will not be sufficient because it cannot handle > disconnects. For example, if the coordinator moves to another broker, then > there is no way we'd be able to guarantee the same host/port. Currently we > try to avoid rebalancing when the coordinator moves. That said, I agree in > principle with the "first comer wins" approach you've suggested. Basically > a member is only removed from the group if its session expires or it leaves > the group explicitly. > > -Jason > > On Mon, Jul 30, 2018 at 4:24 PM, Mike Freyberger <mfreyber...@appnexus.com > > > wrote: > > > Guozhang, > > > > Thanks for giving us a great starting point. > > > > A few questions that come to mind right away: > > > > 1) What do you think a reasonable group-change-timeout would be? I am > > thinking on the order of minutes (5 minutes?) > > > > 2) Will the nodes that are still alive continue to make progress during a > > static membership rebalance? I believe during a rebalance today all > > consumers wait for the SyncGroupResponse before continuing to read data > > from the brokers. If that is the case, I think it'd be ideal all nodes > that > > are still alive during a static group membership change to continue to > make > > progress as if nothing happened such that are there is no impact to the > > majority of the group when one node is bounced (quick restart). > > > > 3) Do you think an explicit method for forcing a rebalance would be > > needed? I am thinking of a scenario such as a disk failure on a node, and > > that node will definitely not come back. Rather than waiting up to the > > group-change-timeout, I think it'd be good an admin to force a rebalance > > rather than wait the full group-change-timeout. Maybe this is an over > > optimization, but I think certain use cases would benefit from static > group > > membership with the ability to force a rebalance at any time. > > > > Best, > > > > Mike > > > > On 7/30/18, 6:57 PM, "Guozhang Wang" <wangg...@gmail.com> wrote: > > > > Hello Boyang / Jason / Mike, > > > > Thanks for your thoughtful inputs! Regarding the fencing issue, I've > > thought about leveraging the epoch notion from PID of transactional > > messaging before, but since in this proposal we do not always require > > member ids from clients, and hence could have a mixed of > user-specified > > member ids with coordinator-generated member ids, the epoch idea may > > not be > > very well suited for this scenario. Of course, we can tighten the > > screws a > > bit by requiring that for a given consumer group, all consumers must > > either > > be giving their member ids or leveraging on consumer coordinator to > > give > > member ids, which does not sound a very strict requirement in > > practice, and > > all we need to do is the add a new field in the join group request > (we > > are > > proposing to bump up its version anyways). And hence I've also > thought > > about another simple fencing approach, aka "first comer wins", that > is > > to > > pass in the host / port information from KafkaApis to > GroupCoordinator > > to > > validate if it matches the existing member id's cached host / post. > It > > does > > not always guarantee that we fence the right zombies because of > "first > > comer wins" (think of a scenario where the right client gets kicked > > out, > > and then before it re-joins the actual zombie with the same member id > > gets > > joined), but as I mentioned previously it will poke some leaks into > the > > code hierarchy a bit so I'm also hesitant to do it. If people think > it > > is > > indeed a must-have than good-to-have, I'd suggest we leverage on > > host-port > > than using the epoch mechanism then. > > > > ------------------------------------------ > > > > As for the more general idea of having a static membership protocol > to > > better integrated with Cloud environment like k8s, I think the first > > idea > > may actually be better fit with it. > > > > Just a quick summary of what rebalance issues we face today: > > > > 1. Application start: when multi-instance application is started, > > multiple > > rebalances are triggered to migrate states to newly started instances > > since > > not all instances are joining at the same time. NOTE that KIP-134 > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 134%3A+Delay+initial+consumer+group+rebalance> > > is > > targeted for this issue, but as an after-thought it may not be the > > optimal > > solution. > > 2. Application shutdown: similarly to 1), when multi-instance > > application > > is shutting down, multiple rebalances are triggered. > > 3. Application scale out: when a new instance is started, one > > rebalance is > > executed to shuffle all assignment, rather than just a "partial" > > shuffling > > of some of the members. > > 4. Application scale in: similarly to 3), when an existing instance > > gracefully shutdown, once rebalance is executed to shuffle all > > assignment. > > 5. Application instance bounce (upgrade, config change etc): one > > instance > > shut down and then restart, it will trigger two rebalances. NOTE that > > disabling leave-group is targeted for this issue. > > 6. Application instance failure: one instance failed, and probably a > > new > > instance start to take its assignment (e.g. k8s), it will trigger two > > rebalances. The different with 3) above is that new instance would > not > > have > > local cached tasks. > > > > > > Among them, I think 1/2/5/6 could potentially be grouped together as > > "static membership"; 4/5 could be grouped as another category, of > > allowing > > "incremental rebalance" or "partial rebalance" than full-rebalance. > Of > > course, having incremental rebalances can help on 1/2/5/6 as well to > > reduce > > the cost of each unnecessary rebalance, but ideally we want NO > > rebalances > > at all for these cases, which will be more true with k8s / etc > > integrations > > or static memberships. > > > > ------------------------------------------ > > > > So just to throw in a sketchy idea following this route for 1/2/5/6 > for > > brainstorming kick-off: > > > > > > 1. We bump up the JoinGroupRequest with additional fields: > > > > 1.a) a flag indicating "static" or "dynamic" membership protocols. > > 1.b) with "static" membership, we also add the pre-defined member > id. > > 1.c) with "static" membership, we also add an optional > > "group-change-timeout" value. > > > > 2. On the broker side, we enforce only one of the two protocols for > all > > group members: we accept the protocol on the first joined member of > the > > group, and if later joining members indicate a different membership > > protocol, we reject it. If the group-change-timeout value was > > different to > > the first joined member, we reject it as well. > > > > 3. With dynamic membership, nothing is changed; with static > > membership, we > > do the following: > > > > 3.a) never assign member ids, instead always expect the joining > > members > > to come with their own member id; we could do the fencing based on > > host / > > port here. > > 3.b) upon receiving the first join group request, use the > > "group-change-timeout" instead of the session-timeout as rebalance > > timeout > > to wait for other members to join. This is for 1) above. > > 3.c) upon receiving a leave-group request, use the > > "group-change-timeout" > > to wait for more members to leave group as well, or for the left > > members to > > re-join. After the timeout we trigger a rebalance with whatever have > > left > > in the members list. This is for all 2 (expecting other members to > send > > leave-group) and 5/6 (expecting the left member to re-join). > > > > 4. As a result, we will deprecate KIP-134 and disable-on-leave-group > as > > well. > > > > > > The key idea is that, with "static" membership, groups should be > > created or > > terminated as a whole, and dynamic member changes are not expected > > often. > > Hence we would not react to those membership-changing events > > immediately > > but wait for longer specified time expecting some other systems like > > k8s > > will resume the group members. WDYT? > > > > > > Guozhang > > > > > > On Mon, Jul 30, 2018 at 3:05 PM, Mike Freyberger < > > mfreyber...@appnexus.com> > > wrote: > > > > > Jason, > > > > > > I really appreciate the broader conversation that you are bringing > > up here. > > > > > > I've been working on an application that does streaming joins for a > > while > > > now, and we face a similar issue with group membership being > > dynamic. We > > > are currently using our own StickyAssignor and take special care > > during > > > rolling restarts to make sure consumer assignments do not change. > > > > > > I think a feature that allows for group membership to be fixed, > > along with > > > a CLI for adding or removing a node from the group be ideal. This > > reminds > > > me of some of the work by the DynamoDB team about 10 years back > when > > they > > > differentiated transient failures from permanent failures to deal > > with this > > > problems like this. > > > > > > Best, > > > > > > Mike > > > > > > On 7/30/18, 5:36 PM, "Jason Gustafson" <ja...@confluent.io> wrote: > > > > > > Hi Boyang, > > > > > > Thanks for the response. I think the main point I was trying to > > make > > > is the > > > need for fencing. I am not too concerned about how to generate > a > > > unique id > > > on the client side. The approach you suggested for streams > seems > > > reasonable. However, any time you reuse an id, you need to be > > careful > > > that > > > there is only one instance that can use it at any time. We are > > always > > > running into problems where a previous instance of an > > application comes > > > back to life unexpectedly after we had already presumed it was > > dead. > > > Fencing ensures that even if this happens, it cannot do any > > damage. I > > > would > > > say that some protection from zombies is a requirement here. > > > > > > The second point was more abstract and mainly meant to initiate > > some > > > discussion. We have gone through several iterations of > > improvements to > > > try > > > and reduce the rebalancing in consumer applications. We started > > out > > > trying > > > to tune the session timeout. We have added an internal config > to > > skip > > > leaving the group when streams shuts down. The broker now has a > > config > > > to > > > delay rebalances in case all consumers join at about the same > > time. The > > > approach in this KIP is a step in a more principled direction, > > but it > > > still > > > feels like we are making this unnecessarily hard on ourselves > by > > > insisting > > > that group membership is a dynamic concept. In practice, the > > number of > > > nodes dedicated to an application tends to remain fixed for > long > > > periods of > > > time and only scales up or down when needed. And these days > > you've got > > > frameworks like kubernetes which can automatically provision > new > > nodes > > > if > > > one fails. So the argument for dynamic membership is becoming > > weaker > > > in my > > > opinion. This KIP is basically trying to impose a small degree > of > > > static > > > membership anyway so that rolling restarts do not change > > membership. > > > Anyway, curious to hear some thoughts about this from you and > the > > > others > > > who work on streams. > > > > > > Thanks, > > > Jason > > > > > > > > > On Sat, Jul 28, 2018 at 4:44 PM, Boyang Chen < > > bche...@outlook.com> > > > wrote: > > > > > > > Thanks for the replies, James and Jason. Let me try to > > summarize your > > > > concerns. > > > > > > > > > > > > I think James' question is primarily the severity of user > > using this > > > > config wrongly. The impact would be that the same member id > > being > > > used by > > > > multiple or even all of the consumers. The assignment > protocol > > > couldn't > > > > distinguish any of the overlapping consumers, thus assigning > > the > > > exact same > > > > partitions multiple times to different consumers. I would say > > the > > > processed > > > > result would be including a lot of duplicates and unnecessary > > heavy > > > load on > > > > the client side, The correctness will depend on the user > logic, > > > however I'm > > > > pessimistic. > > > > > > > > > > > > Although the impact is very high, the symptom is not hard to > > triage, > > > > because user could visualize consumer identity overlapping > > fairly > > > easily by > > > > exported consumer metrics. On the user standpoint, they would > > be > > > fully > > > > aware of the potential erratic status before enabling " > > member.id" > > > > configuration IMO. Let me know your thoughts James! > > > > > > > > > > > > Next is Jason's suggestion. Jason shared a higher viewpoint > and > > > pointed > > > > out the problem that we need to solve is to maintain "a > strong > > bias > > > towards > > > > being able to reuse previous state". The proposed approach is > > to > > > separate > > > > the notion of consumer membership and consumer identity. > > > > > > > > > > > > The original idea of this KIP was on the Stream application, > > so I > > > > understand that the identities of multiple consumers belong > to > > one > > > > instance, where each Stream thread will be using one > dedicated > > main > > > > consumer. So in a Stream use case, we could internally > generate > > > member id > > > > with USER_DEFINED_ID + STREAM_THREAD_ID. > > > > > > > > > > > > In pure consumer use case, this could be a little bit > > challenging > > > since > > > > user could arbitrarily initiate multiple consumers on the > same > > > instance > > > > which is out of our library control. This could add up the > > > possibility of > > > > member id collision. So instead of making developers life > > easier, > > > > introducing member id config could break the existing code > > logic and > > > take > > > > long time to understand and fix. Although I still assume this > > is an > > > > advanced config, user may use member id config even before > > they fully > > > > understand the problem, and use the same set of > initialization > > logic > > > cross > > > > multiple consumers on the same instance. > > > > > > > > > > > > I hope I have explained my understanding of the pros and cons > > of > > > this KIP > > > > better. Remember the core argument of this KIP: If the broker > > > recognize > > > > this consumer as an existing member, it shouldn't trigger > > rebalance. > > > If we > > > > build our discussion on top of this argument, the client > > management > > > of > > > > group membership could be tricky at first, but considering > our > > > original > > > > motivation to leader-follower rebalance model, I feel that > > having > > > broker to > > > > create membership info and let client maintain would be less > > > appealing and > > > > fragile. Having client generate membership data could build > up > > > > source-of-truth model and streamline the current > architecture. > > We > > > need also > > > > consider flexibility introduced by this KIP for cloud users > to > > > coordinate > > > > consumer/stream instances more freely. Honestly, I'm > > interested in > > > Jason's > > > > registration id proposal and open to more voices, but I feel > it > > > would be > > > > more complex than the current KIP for user to understand. > Hope > > this > > > makes > > > > sense, Jason. > > > > > > > > > > > > Thank you again for the feedback! > > > > > > > > > > > > Best, > > > > > > > > Boyang > > > > > > > > > > > > ________________________________ > > > > From: Jason Gustafson <ja...@confluent.io> > > > > Sent: Saturday, July 28, 2018 6:50 AM > > > > To: dev > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer > > rebalances > > > by > > > > specifying member id > > > > > > > > Hey Boyang, > > > > > > > > Thanks for the KIP. I think my main question is in the same > > vein as > > > James'. > > > > The problem is that the coordinator needs to be able to > > identify > > > which > > > > instance of a particular memberId is the active one. For EOS, > > each > > > > transactionalId gets an epoch. When a new producer is > started, > > it > > > bumps the > > > > epoch which allows the transaction coordinator to fence off > any > > > zombie > > > > instances which may try to continue doing work with the old > > epoch. > > > It seems > > > > like we need a similar protection for consumer members. > > > > > > > > Suppose for example that we distinguish between a > registration > > id > > > which is > > > > provided by the user and a member id which is assigned > > uniquely by > > > the > > > > coordinator. In the JoinGroup request, both the registration > > id and > > > the > > > > member id are provided. When a consumer is first started, it > > doesn't > > > know > > > > the memberId, so it it provides only the registration id. The > > > coordinator > > > > can then assign a new memberId and invalidate the previous > one > > that > > > was > > > > associated with the registration id. This would then fence > off > > the > > > previous > > > > instance which was still trying to use the member id. > > > > > > > > Taking a little bit of a step back, I think the main > > observation in > > > this > > > > KIP is that applications with heavy local state need to have > a > > > strong bias > > > > toward being able to reuse that state. It is a bit like Kafka > > itself > > > in the > > > > sense that a replica is not moved just because the broker is > > > shutdown as > > > > the cost of moving the log is extremely high. I'm wondering > if > > we > > > need to > > > > think about streams applications in a similar way. Should > > there be a > > > static > > > > notion of the members of the group so that streams can make > > > rebalancing > > > > decisions more easily without depending so heavily on > transient > > > membership? > > > > I feel the hacks we've put in place in some cases to avoid > > > rebalances are a > > > > bit brittle. Delaying group joining for example is an example > > of > > > this. If > > > > you knew ahead of time who the stable members of the group > > were, > > > then this > > > > would not be needed. Anyway, just a thought. > > > > > > > > Thanks, > > > > Jason > > > > > > > > > > > > > > > > On Fri, Jul 27, 2018 at 1:58 PM, James Cheng < > > wushuja...@gmail.com> > > > wrote: > > > > > > > > > When you say that it will "break", what does this breakage > > look > > > like? > > > > Will > > > > > the consumer-group be non-functional? Will just those > > instances be > > > > > non-functional? Or will the group be functional, but the > > > rebalancing be > > > > > non-optimal and require more round-trips/data-transfer? > > (similar > > > to the > > > > > current algorithm) > > > > > > > > > > I'm trying to assess the potential for user-error and the > > impact of > > > > > user-error. > > > > > > > > > > -James > > > > > > > > > > > On Jul 27, 2018, at 11:25 AM, Boyang Chen < > > bche...@outlook.com> > > > wrote: > > > > > > > > > > > > Hey James, > > > > > > > > > > > > > > > > > > the algorithm is relying on client side to provide unique > > > consumer > > > > > member id. It will break unless we enforce some sort of > > validation > > > (host > > > > + > > > > > port) on the server side. To simplify the first version, we > > do not > > > plan > > > > to > > > > > enforce validation. A good comparison would be the EOS > > producer > > > which is > > > > in > > > > > charge of generating unique transaction id sequence. IMO > for > > broker > > > > logic, > > > > > the tolerance of client side error is not unlimited. > > > > > > > > > > > > > > > > > > Thank you! > > > > > > > > > > > > > > > > > > ________________________________ > > > > > > From: James Cheng <wushuja...@gmail.com> > > > > > > Sent: Saturday, July 28, 2018 1:26 AM > > > > > > To: dev@kafka.apache.org > > > > > > Subject: Re: [DISCUSS] KIP-345: Reduce multiple consumer > > > rebalances by > > > > > specifying member id > > > > > > > > > > > > > > > > > >> On Jul 26, 2018, at 11:09 PM, Guozhang Wang < > > wangg...@gmail.com > > > > > > > > wrote: > > > > > >> > > > > > >> Hi Boyang, > > > > > >> > > > > > >> Thanks for the proposed KIP. I made a pass over the wiki > > and > > > here are > > > > > some > > > > > >> comments / questions: > > > > > >> > > > > > >> 1. In order to preserve broker compatibility, we need to > > make > > > sure the > > > > > >> broker version discovery logic can be integrated with > > this new > > > logic. > > > > > I.e. > > > > > >> if a newer versioned consumer is talking to an older > > versioned > > > broker > > > > > who > > > > > >> does not recognize V4, the client needs to downgrade its > > > > > JoinGroupRequest > > > > > >> version to V3 and not setting the member-id > specifically. > > You > > > can > > > > take a > > > > > >> look at the ApiVersionsRequest and see how to work with > > it. > > > > > >> > > > > > >> 2. There may exist some manners to validate that two > > different > > > clients > > > > > do > > > > > >> not send with the same member id, for example if we pass > > along > > > the > > > > > >> host:port information from KafkaApis to the > > GroupCoordinator > > > > interface. > > > > > But > > > > > >> I think this is overly complicated the logic and may not > > > worthwhile > > > > than > > > > > >> relying on users to specify unique member ids. > > > > > > > > > > > > Boyang, > > > > > > > > > > > > Thanks for the KIP! How will the algorithm behave if > > multiple > > > consumers > > > > > provide the same member id? > > > > > > > > > > > > -James > > > > > > > > > > > >> 3. Minor: you would need to bumping up the version of > > > > JoinGroupResponse > > > > > to > > > > > >> V4 as well. > > > > > >> > > > > > >> 4. Minor: in the wiki page, you need to specify the > actual > > > string > > > > value > > > > > for > > > > > >> `MEMBER_ID`, for example "member.id". > > > > > >> > > > > > >> 5. When this additional config it specified by users, we > > should > > > > consider > > > > > >> setting the default of internal > > `LEAVE_GROUP_ON_CLOSE_CONFIG` to > > > > false, > > > > > >> since otherwise its effectiveness would be less. > > > > > >> > > > > > >> > > > > > >> Guozhang > > > > > >> > > > > > >> > > > > > >> > > > > > >>> On Thu, Jul 26, 2018 at 9:20 PM, Boyang Chen < > > > bche...@outlook.com> > > > > > wrote: > > > > > >>> > > > > > >>> Hey friends, > > > > > >>> > > > > > >>> > > > > > >>> I would like to open a discussion thread on KIP-345: > > > > > >>> > > > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 345%3A > > > > > >>> +Reduce+multiple+consumer+rebalances+by+specifying+ > > member+id > > > > > >>> > > > > > >>> > > > > > >>> This KIP is trying to resolve multiple rebalances by > > > maintaining the > > > > > >>> consumer member id across rebalance generations. I have > > > verified the > > > > > theory > > > > > >>> on our internal Stream application, and it could reduce > > > rebalance > > > > time > > > > > to a > > > > > >>> few seconds when service is rolling restart. > > > > > >>> > > > > > >>> > > > > > >>> Let me know your thoughts, thank you! > > > > > >>> > > > > > >>> > > > > > >>> Best, > > > > > >>> > > > > > >>> Boyang > > > > > >>> > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > > > > > -- -- Guozhang