Cool, will update the page. On Wed, May 22, 2019 at 9:38 PM Matthias J. Sax <matth...@confluent.io> wrote:
> SGTM. > > I think we should still document the behavior on the KIP and explain why > it's implemented that way. > > > -Matthias > > On 5/22/19 7:04 PM, Guozhang Wang wrote: > > Hello Matthias, > > > > I've thought about that before, and the reason I did not include this as > > part of the KIP-429 scope is that fetcher / coordinator may get quite > > complicated to return non-empty data if > "updateAssignmentMetadataIfNeeded" > > returns false in KafkaConsumer. In addition, when there's a rebalance in > > progress, letting consumers to process data which potentially may take > > longer time (in Streams for example, it is related to `max.poll.interval` > > config) could lead to higher chance of "partitions lost" and wasted > > processing work. > > > > So I've decided to still keep it as simple as is today, and admittedly > from > > a user perspective, they may see consecutive "poll" call returning no > data. > > I will create a JIRA ticket capturing this idea for future discussion > > whether we should consider this as a general optimization in consumer. > Does > > that sound good to you? > > > > > > Guozhang > > > > > > On Wed, May 22, 2019 at 4:31 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > >> Thanks to the KIP and starting the VOTE Guozhang. I am overall +1. > >> > >> One follow up thought: the KIP does not discuss in details, how `poll()` > >> will behave after the change. It might actually be important to ensure > >> that `poll()` behavior changes to be non-blocking to allow an > >> application to process data from non-revoked partitions while a > >> rebalance is happening in the background. > >> > >> Thoughts? > >> > >> > >> -Matthias > >> > >> > >> On 5/10/19 1:10 AM, Guozhang Wang wrote: > >>> Hello Matthias, > >>> > >>> I'm proposing to change this behavior holistically inside > >>> ConsumerCoordinator actually. In other words I'm trying to piggy-back > >> this > >>> behavioral fix of KAFKA-4600 along with this KIP, and the motivation > for > >> me > >>> to do this piggy-backing is that, with incremental rebalancing, there > >> would > >>> be partial affected partitions as we are not revoking every body any > >> more. > >>> > >>> > >>> Guozhang > >>> > >>> > >>> On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io> > >>> wrote: > >>> > >>>> Thanks Guozhang! > >>>> > >>>> The simplified upgrade path is great! > >>>> > >>>> > >>>> Just a clarification question about the "Rebalance Callback Error > >>>> Handling" -- does this change affect the `ConsumerCoordinator` only if > >>>> incremental rebalancing is use? Or does the behavior also change for > the > >>>> eager rebalancing case? > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 5/9/19 3:37 AM, Guozhang Wang wrote: > >>>>> Hello all, > >>>>> > >>>>> Thanks for everyone who've shared their feedbacks for this KIP! If > >>>> there's > >>>>> no further comments I'll start the voting thread by end of tomorrow. > >>>>> > >>>>> > >>>>> Guozhang. > >>>>> > >>>>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> > >> wrote: > >>>>> > >>>>>> Hello Boyang, > >>>>>> > >>>>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> > >> wrote: > >>>>>> > >>>>>>> Hey Guozhang, > >>>>>>> > >>>>>>> thank you for the great write up. Overall the motivation and > changes > >>>>>>> LGTM, just some minor comments: > >>>>>>> > >>>>>>> > >>>>>>> 1. In "Consumer Coordinator Algorithm", we could reorder > alphabet > >>>>>>> points for 3d~3f from ["ready-to-migrate-partitions", > >>>>>>> "unknown-but-owned-partitions", "maybe-revoking-partitions"] to > >>>>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions", > >>>>>>> "unknown-but-owned-partitions"] in order to be consistent with > 3c1~3. > >>>>>>> > >>>>>> > >>>>>> Ack. Updated. > >>>>>> > >>>>>> > >>>>>>> 2. In "Consumer Coordinator Algorithm", 1c suggests to revoke > all > >>>>>>> partition upon heartbeat/commit fail. What's the gain here? Do we > >> want > >>>> to > >>>>>>> keep all partitions running at this moment, to be optimistic for > the > >>>> case > >>>>>>> when no partitions get reassigned? > >>>>>>> > >>>>>> > >>>>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can > >> just > >>>>>> re-join the group with all the currently owned partitions encoded. > >>>> Updated. > >>>>>> > >>>>>> > >>>>>>> 3. In "Recommended Upgrade Procedure", remove extra 'those': " > The > >>>>>>> 'sticky' assignor works even those there are " > >>>>>>> > >>>>>> > >>>>>> Ack, should be `even when`. > >>>>>> > >>>>>> > >>>>>>> 4. Put two "looking into the future" into a separate category > from > >>>>>>> migration session. It seems inconsistent for readers to see this > >>>> before we > >>>>>>> finished discussion for everything. > >>>>>>> > >>>>>> > >>>>>> Ack. > >>>>>> > >>>>>> > >>>>>>> 5. Have we discussed the concern on the serialization? Could the > >> new > >>>>>>> metadata we are adding grow larger than the message size cap? > >>>>>>> > >>>>>> > >>>>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 > >> which > >>>>>> should largely reduce the message size (will update the wiki > >>>> accordingly as > >>>>>> well). > >>>>>> > >>>>>> > >>>>>>> > >>>>>>> Boyang > >>>>>>> > >>>>>>> ________________________________ > >>>>>>> From: Guozhang Wang <wangg...@gmail.com> > >>>>>>> Sent: Monday, April 15, 2019 9:20 AM > >>>>>>> To: dev > >>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > >> Streams > >>>>>>> > >>>>>>> Hello Jason, > >>>>>>> > >>>>>>> I agree with you that for range / round-robin it makes less sense > to > >> be > >>>>>>> compatible with cooperative rebalance protocol. > >>>>>>> > >>>>>>> As for StickyAssignor, however, I think it would still be possible > to > >>>> make > >>>>>>> the current implementation to be compatible with cooperative > >>>> rebalance. So > >>>>>>> after pondering on different options at hand I'm now proposing this > >>>>>>> approach as listed in the upgrade section: > >>>>>>> > >>>>>>> > >>>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath > >>>>>>> > >>>>>>> The idea is to let assignors specify which protocols it would work > >>>> with, > >>>>>>> associating with a different name; then the upgrade path would > >> involve > >>>> a > >>>>>>> "compatible" protocol which actually still use eager behavior while > >>>>>>> encoding two assignors if possible. In "Rejected Section" (just to > >>>> clarify > >>>>>>> I'm not finalizing it as rejected, just putting it there for now, > and > >>>> if > >>>>>>> we > >>>>>>> like this one instead we can always switch them) I listed the other > >>>>>>> approach we once discussed about, and arguing its cons of > duplicated > >>>> class > >>>>>>> seems overwhelm the pros of saving the "rebalance.protocol" > config. > >>>>>>> > >>>>>>> Let me know WDYT. > >>>>>>> > >>>>>>> Guozhang > >>>>>>> > >>>>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson < > ja...@confluent.io> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Guozhang, > >>>>>>>> > >>>>>>>> Responses below: > >>>>>>>> > >>>>>>>> 2. The interface's default implementation will just be > >>>>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not > >> make > >>>>>>> any > >>>>>>>>> code changes they should be able to recompile the code and > >> continue. > >>>>>>>> > >>>>>>>> > >>>>>>>> Ack, makes sense. > >>>>>>>> > >>>>>>>> 4. Hmm.. not sure if it will work. The main issue is that the > >>>>>>>>> consumer-coordinator behavior (whether to revoke all or none at > >>>>>>>>> onRebalancePrepare) is independent of the selected protocol's > >>>> assignor > >>>>>>>>> (eager or cooperative), so even if the assignor is selected to be > >> the > >>>>>>>>> old-versioned one, we will still not revoke at the > >>>>>>> consumer-coordinator > >>>>>>>>> layer and hence has the same risk of migrating still-owned > >>>> partitions, > >>>>>>>>> right? > >>>>>>>> > >>>>>>>> > >>>>>>>> Yeah, basically we would have to push the eager/cooperative logic > >> into > >>>>>>> the > >>>>>>>> PartitionAssignor itself and make the consumer aware of the > >> rebalance > >>>>>>>> protocol it is compatible with. As long as an eager protocol > _could_ > >>>> be > >>>>>>>> selected, the consumer would have to be pessimistic and do eager > >>>>>>>> revocation. But if all the assignors configured in the consumer > >>>> support > >>>>>>>> cooperative reassignment, then either 1) a cooperative protocol > will > >>>> be > >>>>>>>> selected and cooperative revocation can be safely used, or 2) if > the > >>>>>>> rest > >>>>>>>> of the group does not support it, then the consumer will simply > >> fail. > >>>>>>>> > >>>>>>>> Another point which you raised offline and I will repeat here is > >> that > >>>>>>> this > >>>>>>>> proposal's benefit is mostly limited to sticky assignment logic. > >>>>>>> Arguably > >>>>>>>> the range assignor may have some incidental stickiness, > particularly > >>>> if > >>>>>>> the > >>>>>>>> group is rebalancing for a newly created or deleted topic. For > other > >>>>>>> cases, > >>>>>>>> the proposal is mostly additional overhead since it takes an > >>>> additional > >>>>>>>> rebalance and many of the partitions will move. Perhaps it doesn't > >>>> make > >>>>>>> as > >>>>>>>> much sense to use the cooperative protocol for strategies like > range > >>>> and > >>>>>>>> round-robin. That kind of argues in favor of pushing some of the > >>>> control > >>>>>>>> into the assignor itself. Maybe we would not bother creating > >>>>>>>> CooperativeRange as I suggested above, but it would make sense to > >>>>>>> create a > >>>>>>>> cooperative version of the sticky assignment strategy. I thought > we > >>>>>>> might > >>>>>>>> have to create a new sticky assignor anyway because I can't see > how > >> we > >>>>>>>> would get compatible behavior mixing with the old version anyway. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Jason > >>>>>>>> > >>>>>>>> > >>>>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com > > > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hello Matthias: > >>>>>>>>> > >>>>>>>>> Thanks for your review. > >>>>>>>>> > >>>>>>>>> The background section uses streams assignor as well as the > >>>> consumer's > >>>>>>>> own > >>>>>>>>> stick assignor as examples illustrating the situation, but this > KIP > >>>> is > >>>>>>>> for > >>>>>>>>> consumer coordinator itself, and the rest of the paragraph did > not > >>>>>>> talk > >>>>>>>>> about Streams any more. If you feel it's a bit distracted I can > >>>> remove > >>>>>>>>> those examples. > >>>>>>>>> > >>>>>>>>> 10). While working on the PR I realized that the revoked > partitions > >>>> on > >>>>>>>>> assignment is not needed (this is being discussed on the PR > itself: > >>>>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 > >>>>>>>>> > >>>>>>>>> 20). 1.a. Good question, I've updated the wiki to let the > >> consumer's > >>>>>>>>> cleanup assignment and re-join, and not letting assignor making > any > >>>>>>>>> proactive changes. The idea is to keep logic simpler and not > doing > >>>> any > >>>>>>>>> "split brain" stuff. > >>>>>>>>> > >>>>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be > >> part > >>>>>>> of > >>>>>>>> the > >>>>>>>>> Subscription passed in to assign() already. > >>>>>>>>> > >>>>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not > be > >>>>>>>>> addressed by rebalance delay still, hence still voted KIP-345 > (some > >>>>>>> more > >>>>>>>>> details can be found on the discussion thread of KIP-345 itself). > >> One > >>>>>>>>> example is that as the instance resumes, its member id will be > >> empty > >>>>>>> so > >>>>>>>> we > >>>>>>>>> are still relying on assignor to give it the assignment from the > >> old > >>>>>>>>> member-id while keeping all other member's assignment unchanged. > >>>>>>>>> > >>>>>>>>> 40). Incomplete sentence, I've updated it. > >>>>>>>>> > >>>>>>>>> 50). Here's my idea: suppose we augment the join group schema > with > >>>>>>>>> `protocol version` in 2.3, and then with both brokers and clients > >>>>>>> being > >>>>>>>> in > >>>>>>>>> version 2.3+, on the first rolling bounce where subscription and > >>>>>>>> assignment > >>>>>>>>> schema and / or user metadata has changed, this protocol version > >> will > >>>>>>> be > >>>>>>>>> bumped. On the broker side, when receiving all member's > join-group > >>>>>>>> request, > >>>>>>>>> it will choose the one that has the highest protocol version > (also > >> it > >>>>>>>>> assumes higher versioned protocol is always backward compatible, > >> i.e. > >>>>>>> the > >>>>>>>>> coordinator can recognize lower versioned protocol as well) and > >>>>>>> select it > >>>>>>>>> as the leader. Then the leader can decide, based on its received > >> and > >>>>>>>>> deserialized subscription information, how to assign partitions > and > >>>>>>> how > >>>>>>>> to > >>>>>>>>> encode the assignment accordingly so that everyone can understand > >> it. > >>>>>>>> With > >>>>>>>>> this, in Streams for example, no version probing would be needed > >>>>>>> since we > >>>>>>>>> are guaranteed the leader knows everyone's version -- again it is > >>>>>>>> assuming > >>>>>>>>> that higher versioned protocol is always backward compatible -- > and > >>>>>>> hence > >>>>>>>>> can successfully do the assignment at that round. > >>>>>>>>> > >>>>>>>>> 60). My bad, this section was not updated while the design was > >>>>>>> evolved, > >>>>>>>>> I've updated it. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay > >> is > >>>>>>>> that > >>>>>>>>>> it is a rather fixed trade-off between > >>>>>>>>>> > >>>>>>>>>> task availability and resource shuffling. If we eventually > trigger > >>>>>>>>>> rebalance after rolling bounce, certain consumer > >>>>>>>>>> > >>>>>>>>>> setup is still faced with global shuffles, for example > member.id > >>>>>>>> ranking > >>>>>>>>>> based round robin strategy, as rejoining dynamic > >>>>>>>>>> > >>>>>>>>>> members will be assigned with new member.id which reorders the > >>>>>>>>>> assignment. So I think the primary goal of incremental > >>>>>>>>>> > >>>>>>>>>> rebalancing is still improving the cluster availability during > >>>>>>>> rebalance, > >>>>>>>>>> because it didn't revoke any partition during this > >>>>>>>>>> > >>>>>>>>>> process. Also, the perk is minimum configuration requirement :) > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> > >>>>>>>>>> Boyang > >>>>>>>>>> > >>>>>>>>>> ________________________________ > >>>>>>>>>> From: Matthias J. Sax <matth...@confluent.io> > >>>>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM > >>>>>>>>>> To: dev > >>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka > >>>>>>> Streams > >>>>>>>>>> > >>>>>>>>>> Thank for the KIP, Boyang and Guozhang! > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> I made an initial pass and have some questions/comments. One > high > >>>>>>> level > >>>>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka > >>>>>>> Streams > >>>>>>>>>> use case a little bit (at least in the presentation). It might > be > >>>>>>>>>> helpful to separate both cases clearly, or maybe limit the scope > >> to > >>>>>>>>>> plain consumer only. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new > >>>>>>> method > >>>>>>>>>> `List<TopicPartitions> revokedPartitions()` ? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 20) In Section "Consumer Coordinator Algorithm" > >>>>>>>>>> > >>>>>>>>>> Bullet point "1a)": If the subscription changes and a topic > is > >>>>>>>>>> removed from the subscription, why do we not revoke the > >> partitions? > >>>>>>>>>> > >>>>>>>>>> Bullet point "1a)": What happens is a topic is deleted (or a > >>>>>>>>>> partition is removed/deleted from a topic)? Should we call the > new > >>>>>>>>>> `onPartitionsEmigrated()` callback for this case? > >>>>>>>>>> > >>>>>>>>>> Bullet point "2b)" Should we update the `PartitionAssignor` > >>>>>>>>>> interface to pass in the "old assignment" as third parameter > into > >>>>>>>>>> `assign()`? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance > delay > >>>>>>>>>> subsume KIP-345? Configuring static members is rather > complicated, > >>>>>>> and > >>>>>>>> I > >>>>>>>>>> am wondering if a rebalance delay would be sufficient? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) > >> forever." > >>>>>>>>>> > >>>>>>>>>> What is "case 3.b" ? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 50) Section "Looking into the Future" > >>>>>>>>>> > >>>>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first > line > >>>>>>>>>> describing "JoinGroupRequest" > >>>>>>>>>> > >>>>>>>>>>> This can also help saving "version probing" cost on Streams as > >>>>>>> well. > >>>>>>>>>> > >>>>>>>>>> How does this relate to Kafka Streams "version probing" > >>>>>>> implementation? > >>>>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to > improve > >>>>>>>>>> "version probing" ? I have a rough idea, but would like to hear > >> more > >>>>>>>>>> details. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> 60) Section "Recommended Upgrade Procedure" > >>>>>>>>>> > >>>>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will > >> force > >>>>>>>> the > >>>>>>>>>> stream application to stay with protocol type "consumer". > >>>>>>>>>> > >>>>>>>>>> This config is not discussed in the KIP and appears in this > >> section > >>>>>>>>>> without context. Can you elaborate about it? > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -Matthias > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote: > >>>>>>>>>>> Bump up on this discussion thread. I've added a few new > drawings > >>>>>>> for > >>>>>>>>>> better > >>>>>>>>>>> illustration, would really appreciate your feedbacks. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Guozhang > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang < > >> wangg...@gmail.com > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> Hello Boyang, > >>>>>>>>>>>> > >>>>>>>>>>>> I've made another thorough pass over this KIP and I'd like to > >>>>>>> spilt > >>>>>>>> it > >>>>>>>>>>>> into two parts: the first part, covered in KIP-429 would be > >>>>>>> touching > >>>>>>>>> on > >>>>>>>>>>>> Consumer Coordinator only to have incremental rebalance > protocol > >>>>>>> in > >>>>>>>>>> place. > >>>>>>>>>>>> The second part (for now I've reserved KIP number 444 for it) > >>>>>>> would > >>>>>>>>>> contain > >>>>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming > up > >>>>>>> new > >>>>>>>>>>>> members. > >>>>>>>>>>>> > >>>>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is > >>>>>>> ready > >>>>>>>>> for > >>>>>>>>>>>> review and discussions again. Would love to hear people's > >>>>>>> feedbacks > >>>>>>>>> and > >>>>>>>>>>>> ideas. > >>>>>>>>>>>> > >>>>>>>>>>>> Guozhang > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen < > >> bche...@outlook.com > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of > >>>>>>>>> "learner > >>>>>>>>>>>>> task" in addition to "standby task": if the only difference > is > >>>>>>> that > >>>>>>>>> for > >>>>>>>>>>>>> the > >>>>>>>>>>>>> latter, we would consider workload balance while for the > former > >>>>>>> we > >>>>>>>>>> would > >>>>>>>>>>>>> not, I think we can just adjust the logic of > StickyTaskAssignor > >>>>>>> a > >>>>>>>> bit > >>>>>>>>>> to > >>>>>>>>>>>>> break that difference. Adding a new type of task would be > >>>>>>> adding a > >>>>>>>>> lot > >>>>>>>>>> of > >>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a > >>>>>>>>>> standby-task > >>>>>>>>>>>>> I would prefer to do so. > >>>>>>>>>>>>> In the proposal we stated that we are not adding a new type > of > >>>>>>> task > >>>>>>>>>>>>> implementation. The > >>>>>>>>>>>>> learner task shall share the same implementation with normal > >>>>>>>> standby > >>>>>>>>>>>>> task, only that we > >>>>>>>>>>>>> shall tag the standby task with learner and prioritize the > >>>>>>> learner > >>>>>>>>>> tasks > >>>>>>>>>>>>> replay effort. > >>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself > is > >>>>>>>> which > >>>>>>>>>>>>> layer > >>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we > >>>>>>> would > >>>>>>>>> not > >>>>>>>>>>>>> require internal implementation details but only public > facing > >>>>>>> API > >>>>>>>>>>>>> updates, > >>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out > >>>>>>> details > >>>>>>>> on > >>>>>>>>>> the > >>>>>>>>>>>>> implementation design. More specifically: today Streams > embed a > >>>>>>>> full > >>>>>>>>>>>>> fledged Consumer client, which hard-code a > ConsumerCoordinator > >>>>>>>>> inside, > >>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its > >> pluggable > >>>>>>>>>>>>> PartitionAssignor interface and inside the > >>>>>>> StreamsPartitionAssignor > >>>>>>>>> we > >>>>>>>>>>>>> also > >>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is > >>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic > today > >>>>>>>> sites > >>>>>>>>>> in > >>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is: > >>>>>>>>>>>>> > >>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> > >>>>>>> StreamsPartitionAssignor -> > >>>>>>>>>>>>> StickyTaskAssignor. > >>>>>>>>>>>>> > >>>>>>>>>>>>> We need to think about where the proposed implementation > would > >>>>>>> take > >>>>>>>>>> place > >>>>>>>>>>>>> at, and personally I think it is not the best option to > inject > >>>>>>> all > >>>>>>>> of > >>>>>>>>>> them > >>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since > >> the > >>>>>>>>> logic > >>>>>>>>>> of > >>>>>>>>>>>>> "triggering another rebalance" etc would require some > >>>>>>> coordinator > >>>>>>>>> logic > >>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the > other > >>>>>>>> hand, > >>>>>>>>>>>>> since > >>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot > >>>>>>> just > >>>>>>>>>> replace > >>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator > like > >>>>>>>>> Connect > >>>>>>>>>>>>> does > >>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal > in > >>>>>>> both > >>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in > >>>>>>>>>> KIP-98/KIP-129. > >>>>>>>>>>>>> And then the key thing to consider is how to cut off the > >>>>>>> boundary > >>>>>>>> so > >>>>>>>>>> that > >>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be > >>>>>>>> beneficial > >>>>>>>>>>>>> universally for any consumers, while keep the > Streams-specific > >>>>>>>> logic > >>>>>>>>> at > >>>>>>>>>>>>> the > >>>>>>>>>>>>> assignor level. > >>>>>>>>>>>>> Yes, that's also my ideal plan. The details for the > >>>>>>> implementation > >>>>>>>>> are > >>>>>>>>>>>>> depicted > >>>>>>>>>>>>> in this doc< > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>>>>>> , > >>>>>>>>>> [ > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >> > https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p > >>>>>>>>>> ]< > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan< > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >> > https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae > >>>>>>>>>>> > >>>>>>>>>> docs.google.com > >>>>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors: > >> Boyang > >>>>>>>> Chen, > >>>>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] > >>>>>>> Background We > >>>>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing > >> work > >>>>>>> for > >>>>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort > >>>>>>> that > >>>>>>>>> needs > >>>>>>>>>> to... > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>>>>> and I have explained the reasoning on why we want to push a > >>>>>>>>>>>>> global change of replacing ConsumerCoordinator with > >>>>>>>>> StreamCoordinator. > >>>>>>>>>>>>> The motivation > >>>>>>>>>>>>> is that KIP space is usually used for public & algorithm > level > >>>>>>>>> change, > >>>>>>>>>>>>> not for internal > >>>>>>>>>>>>> implementation details. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3. Depending on which design direction we choose, our > migration > >>>>>>>> plan > >>>>>>>>>> would > >>>>>>>>>>>>> also be quite different. For example, if we stay with > >>>>>>>>>> ConsumerCoordinator > >>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to > >>>>>>> make > >>>>>>>>> all > >>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned > >>>>>>> consumers, > >>>>>>>>> then > >>>>>>>>>>>>> our > >>>>>>>>>>>>> migration plan could be much easier. > >>>>>>>>>>>>> Yes, the upgrade plan was designed to take the new > >>>>>>>> StreamCoordinator > >>>>>>>>>>>>> approach > >>>>>>>>>>>>> which means we shall define a new protocol type. For existing > >>>>>>>>>> application > >>>>>>>>>>>>> we could only > >>>>>>>>>>>>> maintain the same `consumer` protocol type is because current > >>>>>>>> broker > >>>>>>>>>> only > >>>>>>>>>>>>> allows > >>>>>>>>>>>>> change of protocol type when the consumer group is empty. It > is > >>>>>>> of > >>>>>>>>>> course > >>>>>>>>>>>>> user-unfriendly to force > >>>>>>>>>>>>> a wipe-out for the entire application, and I don't think > >>>>>>>> maintaining > >>>>>>>>>> old > >>>>>>>>>>>>> protocol type would greatly > >>>>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT? > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today, > >> in > >>>>>>>> the > >>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness > over > >>>>>>>>>> workload > >>>>>>>>>>>>> balance, and hence "learner task" is needed to break this > >>>>>>> priority, > >>>>>>>>> but > >>>>>>>>>>>>> I'm > >>>>>>>>>>>>> wondering if we can have a better solution within sticky task > >>>>>>>>> assignor > >>>>>>>>>>>>> that > >>>>>>>>>>>>> accommodate this? > >>>>>>>>>>>>> Great question! That's what I explained in the proposal, > which > >>>>>>> is > >>>>>>>>> that > >>>>>>>>>> we > >>>>>>>>>>>>> should breakdown our > >>>>>>>>>>>>> delivery into different stages. At very beginning, our goal > is > >>>>>>> to > >>>>>>>>>> trigger > >>>>>>>>>>>>> learner task assignment only on > >>>>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of > >>>>>>> previous > >>>>>>>>>> round > >>>>>>>>>>>>> of rebalance to figure out. After > >>>>>>>>>>>>> stage one, our goal is to have a smooth scaling up > experience, > >>>>>>> but > >>>>>>>>> the > >>>>>>>>>>>>> task balance problem is kind of orthogonal. > >>>>>>>>>>>>> The load balance problem is a much broader topic than auto > >>>>>>> scaling, > >>>>>>>>>> which > >>>>>>>>>>>>> I figure worth discussing within > >>>>>>>>>>>>> this KIP's context since it's a naturally next-step, but > >>>>>>> wouldn't > >>>>>>>> be > >>>>>>>>>> the > >>>>>>>>>>>>> main topic. > >>>>>>>>>>>>> Learner task or auto scaling support should be treated as `a > >>>>>>>> helpful > >>>>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm > >> defining > >>>>>>>> load > >>>>>>>>>>>>> balance`. It would be great if you could share some insights > of > >>>>>>> the > >>>>>>>>>> stream > >>>>>>>>>>>>> task balance, which eventually helps us to break out of the > >>>>>>>> KIP-429's > >>>>>>>>>> scope > >>>>>>>>>>>>> and even define a separate KIP to focus on task weight & > >>>>>>> assignment > >>>>>>>>>> logic > >>>>>>>>>>>>> improvement. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Also thank you for making improvement on the KIP context and > >>>>>>>>>> organization! > >>>>>>>>>>>>> > >>>>>>>>>>>>> Best, > >>>>>>>>>>>>> Boyang > >>>>>>>>>>>>> ________________________________ > >>>>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> > >>>>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM > >>>>>>>>>>>>> To: dev > >>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for > Kafka > >>>>>>>>> Streams > >>>>>>>>>>>>> > >>>>>>>>>>>>> Hello Boyang, > >>>>>>>>>>>>> > >>>>>>>>>>>>> I've just made a quick pass on the KIP and here are some > >>>>>>> thoughts. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Meta: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of > >>>>>>>>> "learner > >>>>>>>>>>>>> task" in addition to "standby task": if the only difference > is > >>>>>>> that > >>>>>>>>> for > >>>>>>>>>>>>> the > >>>>>>>>>>>>> latter, we would consider workload balance while for the > former > >>>>>>> we > >>>>>>>>>> would > >>>>>>>>>>>>> not, I think we can just adjust the logic of > StickyTaskAssignor > >>>>>>> a > >>>>>>>> bit > >>>>>>>>>> to > >>>>>>>>>>>>> break that difference. Adding a new type of task would be > >>>>>>> adding a > >>>>>>>>> lot > >>>>>>>>>> of > >>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a > >>>>>>>>>> standby-task > >>>>>>>>>>>>> I would prefer to do so. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself > is > >>>>>>>> which > >>>>>>>>>>>>> layer > >>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we > >>>>>>> would > >>>>>>>>> not > >>>>>>>>>>>>> require internal implementation details but only public > facing > >>>>>>> API > >>>>>>>>>>>>> updates, > >>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out > >>>>>>> details > >>>>>>>> on > >>>>>>>>>> the > >>>>>>>>>>>>> implementation design. More specifically: today Streams > embed a > >>>>>>>> full > >>>>>>>>>>>>> fledged Consumer client, which hard-code a > ConsumerCoordinator > >>>>>>>>> inside, > >>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its > plugable > >>>>>>>>>>>>> PartitionAssignor interface and inside the > >>>>>>> StreamsPartitionAssignor > >>>>>>>>> we > >>>>>>>>>>>>> also > >>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is > >>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic > today > >>>>>>>> sites > >>>>>>>>>> in > >>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is: > >>>>>>>>>>>>> > >>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> > >>>>>>> StreamsPartitionAssignor -> > >>>>>>>>>>>>> StickyTaskAssignor. > >>>>>>>>>>>>> > >>>>>>>>>>>>> We need to think about where the proposed implementation > would > >>>>>>> take > >>>>>>>>>> place > >>>>>>>>>>>>> at, and personally I think it is not the best option to > inject > >>>>>>> all > >>>>>>>> of > >>>>>>>>>> them > >>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since > >> the > >>>>>>>>> logic > >>>>>>>>>> of > >>>>>>>>>>>>> "triggering another rebalance" etc would require some > >>>>>>> coordinator > >>>>>>>>> logic > >>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the > other > >>>>>>>> hand, > >>>>>>>>>>>>> since > >>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot > >>>>>>> just > >>>>>>>>>> replace > >>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator > like > >>>>>>>>> Connect > >>>>>>>>>>>>> does > >>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal > in > >>>>>>> both > >>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in > >>>>>>>>>> KIP-98/KIP-129. > >>>>>>>>>>>>> And then the key thing to consider is how to cut off the > >>>>>>> boundary > >>>>>>>> so > >>>>>>>>>> that > >>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be > >>>>>>>> beneficial > >>>>>>>>>>>>> universally for any consumers, while keep the > Streams-specific > >>>>>>>> logic > >>>>>>>>> at > >>>>>>>>>>>>> the > >>>>>>>>>>>>> assignor level. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 3. Depending on which design direction we choose, our > migration > >>>>>>>> plan > >>>>>>>>>> would > >>>>>>>>>>>>> also be quite different. For example, if we stay with > >>>>>>>>>> ConsumerCoordinator > >>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to > >>>>>>> make > >>>>>>>>> all > >>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned > >>>>>>> consumers, > >>>>>>>>> then > >>>>>>>>>>>>> our > >>>>>>>>>>>>> migration plan could be much easier. > >>>>>>>>>>>>> > >>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today, > >> in > >>>>>>>> the > >>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness > over > >>>>>>>>>> workload > >>>>>>>>>>>>> balance, and hence "learner task" is needed to break this > >>>>>>> priority, > >>>>>>>>> but > >>>>>>>>>>>>> I'm > >>>>>>>>>>>>> wondering if we can have a better solution within sticky task > >>>>>>>>> assignor > >>>>>>>>>>>>> that > >>>>>>>>>>>>> accommodate this? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Minor: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 1. The idea of two rebalances have also been discussed in > >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we > should > >>>>>>> add > >>>>>>>>> the > >>>>>>>>>>>>> reference on the wiki page as well. > >>>>>>>>>>>>> 2. Could you also add a section describing how the > subscription > >>>>>>> / > >>>>>>>>>>>>> assignment metadata will be re-formatted? Without this > >>>>>>> information > >>>>>>>> it > >>>>>>>>>> is > >>>>>>>>>>>>> hard to get to the bottom of your idea. For example in the > >>>>>>> "Leader > >>>>>>>>>>>>> Transfer > >>>>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 > >> is > >>>>>>>> new > >>>>>>>>>>>>> member" > >>>>>>>>>>>>> and hence would blindly obey stickiness over workload balance > >>>>>>>>>> requirement. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen < > >>>>>>> bche...@outlook.com> > >>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hey community friends, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to > add > >>>>>>>>>>>>> incremental > >>>>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and > >>>>>>>> important > >>>>>>>>>>>>>> feedbacks while making this KIP! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Best, > >>>>>>>>>>>>>> Boyang > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> -- Guozhang > >>>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>>> > >>>> > >>>> > >>> > >> > >> > > > > -- -- Guozhang