Thanks to the KIP and starting the VOTE Guozhang. I am overall +1. One follow up thought: the KIP does not discuss in details, how `poll()` will behave after the change. It might actually be important to ensure that `poll()` behavior changes to be non-blocking to allow an application to process data from non-revoked partitions while a rebalance is happening in the background.
Thoughts? -Matthias On 5/10/19 1:10 AM, Guozhang Wang wrote: > Hello Matthias, > > I'm proposing to change this behavior holistically inside > ConsumerCoordinator actually. In other words I'm trying to piggy-back this > behavioral fix of KAFKA-4600 along with this KIP, and the motivation for me > to do this piggy-backing is that, with incremental rebalancing, there would > be partial affected partitions as we are not revoking every body any more. > > > Guozhang > > > On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks Guozhang! >> >> The simplified upgrade path is great! >> >> >> Just a clarification question about the "Rebalance Callback Error >> Handling" -- does this change affect the `ConsumerCoordinator` only if >> incremental rebalancing is use? Or does the behavior also change for the >> eager rebalancing case? >> >> >> -Matthias >> >> >> On 5/9/19 3:37 AM, Guozhang Wang wrote: >>> Hello all, >>> >>> Thanks for everyone who've shared their feedbacks for this KIP! If >> there's >>> no further comments I'll start the voting thread by end of tomorrow. >>> >>> >>> Guozhang. >>> >>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> wrote: >>> >>>> Hello Boyang, >>>> >>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> wrote: >>>> >>>>> Hey Guozhang, >>>>> >>>>> thank you for the great write up. Overall the motivation and changes >>>>> LGTM, just some minor comments: >>>>> >>>>> >>>>> 1. In "Consumer Coordinator Algorithm", we could reorder alphabet >>>>> points for 3d~3f from ["ready-to-migrate-partitions", >>>>> "unknown-but-owned-partitions", "maybe-revoking-partitions"] to >>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions", >>>>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3. >>>>> >>>> >>>> Ack. Updated. >>>> >>>> >>>>> 2. In "Consumer Coordinator Algorithm", 1c suggests to revoke all >>>>> partition upon heartbeat/commit fail. What's the gain here? Do we want >> to >>>>> keep all partitions running at this moment, to be optimistic for the >> case >>>>> when no partitions get reassigned? >>>>> >>>> >>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can just >>>> re-join the group with all the currently owned partitions encoded. >> Updated. >>>> >>>> >>>>> 3. In "Recommended Upgrade Procedure", remove extra 'those': " The >>>>> 'sticky' assignor works even those there are " >>>>> >>>> >>>> Ack, should be `even when`. >>>> >>>> >>>>> 4. Put two "looking into the future" into a separate category from >>>>> migration session. It seems inconsistent for readers to see this >> before we >>>>> finished discussion for everything. >>>>> >>>> >>>> Ack. >>>> >>>> >>>>> 5. Have we discussed the concern on the serialization? Could the new >>>>> metadata we are adding grow larger than the message size cap? >>>>> >>>> >>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 which >>>> should largely reduce the message size (will update the wiki >> accordingly as >>>> well). >>>> >>>> >>>>> >>>>> Boyang >>>>> >>>>> ________________________________ >>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>> Sent: Monday, April 15, 2019 9:20 AM >>>>> To: dev >>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams >>>>> >>>>> Hello Jason, >>>>> >>>>> I agree with you that for range / round-robin it makes less sense to be >>>>> compatible with cooperative rebalance protocol. >>>>> >>>>> As for StickyAssignor, however, I think it would still be possible to >> make >>>>> the current implementation to be compatible with cooperative >> rebalance. So >>>>> after pondering on different options at hand I'm now proposing this >>>>> approach as listed in the upgrade section: >>>>> >>>>> >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath >>>>> >>>>> The idea is to let assignors specify which protocols it would work >> with, >>>>> associating with a different name; then the upgrade path would involve >> a >>>>> "compatible" protocol which actually still use eager behavior while >>>>> encoding two assignors if possible. In "Rejected Section" (just to >> clarify >>>>> I'm not finalizing it as rejected, just putting it there for now, and >> if >>>>> we >>>>> like this one instead we can always switch them) I listed the other >>>>> approach we once discussed about, and arguing its cons of duplicated >> class >>>>> seems overwhelm the pros of saving the "rebalance.protocol" config. >>>>> >>>>> Let me know WDYT. >>>>> >>>>> Guozhang >>>>> >>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io> >>>>> wrote: >>>>> >>>>>> Hi Guozhang, >>>>>> >>>>>> Responses below: >>>>>> >>>>>> 2. The interface's default implementation will just be >>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not make >>>>> any >>>>>>> code changes they should be able to recompile the code and continue. >>>>>> >>>>>> >>>>>> Ack, makes sense. >>>>>> >>>>>> 4. Hmm.. not sure if it will work. The main issue is that the >>>>>>> consumer-coordinator behavior (whether to revoke all or none at >>>>>>> onRebalancePrepare) is independent of the selected protocol's >> assignor >>>>>>> (eager or cooperative), so even if the assignor is selected to be the >>>>>>> old-versioned one, we will still not revoke at the >>>>> consumer-coordinator >>>>>>> layer and hence has the same risk of migrating still-owned >> partitions, >>>>>>> right? >>>>>> >>>>>> >>>>>> Yeah, basically we would have to push the eager/cooperative logic into >>>>> the >>>>>> PartitionAssignor itself and make the consumer aware of the rebalance >>>>>> protocol it is compatible with. As long as an eager protocol _could_ >> be >>>>>> selected, the consumer would have to be pessimistic and do eager >>>>>> revocation. But if all the assignors configured in the consumer >> support >>>>>> cooperative reassignment, then either 1) a cooperative protocol will >> be >>>>>> selected and cooperative revocation can be safely used, or 2) if the >>>>> rest >>>>>> of the group does not support it, then the consumer will simply fail. >>>>>> >>>>>> Another point which you raised offline and I will repeat here is that >>>>> this >>>>>> proposal's benefit is mostly limited to sticky assignment logic. >>>>> Arguably >>>>>> the range assignor may have some incidental stickiness, particularly >> if >>>>> the >>>>>> group is rebalancing for a newly created or deleted topic. For other >>>>> cases, >>>>>> the proposal is mostly additional overhead since it takes an >> additional >>>>>> rebalance and many of the partitions will move. Perhaps it doesn't >> make >>>>> as >>>>>> much sense to use the cooperative protocol for strategies like range >> and >>>>>> round-robin. That kind of argues in favor of pushing some of the >> control >>>>>> into the assignor itself. Maybe we would not bother creating >>>>>> CooperativeRange as I suggested above, but it would make sense to >>>>> create a >>>>>> cooperative version of the sticky assignment strategy. I thought we >>>>> might >>>>>> have to create a new sticky assignor anyway because I can't see how we >>>>>> would get compatible behavior mixing with the old version anyway. >>>>>> >>>>>> Thanks, >>>>>> Jason >>>>>> >>>>>> >>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com> >>>>> wrote: >>>>>> >>>>>>> Hello Matthias: >>>>>>> >>>>>>> Thanks for your review. >>>>>>> >>>>>>> The background section uses streams assignor as well as the >> consumer's >>>>>> own >>>>>>> stick assignor as examples illustrating the situation, but this KIP >> is >>>>>> for >>>>>>> consumer coordinator itself, and the rest of the paragraph did not >>>>> talk >>>>>>> about Streams any more. If you feel it's a bit distracted I can >> remove >>>>>>> those examples. >>>>>>> >>>>>>> 10). While working on the PR I realized that the revoked partitions >> on >>>>>>> assignment is not needed (this is being discussed on the PR itself: >>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 >>>>>>> >>>>>>> 20). 1.a. Good question, I've updated the wiki to let the consumer's >>>>>>> cleanup assignment and re-join, and not letting assignor making any >>>>>>> proactive changes. The idea is to keep logic simpler and not doing >> any >>>>>>> "split brain" stuff. >>>>>>> >>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be part >>>>> of >>>>>> the >>>>>>> Subscription passed in to assign() already. >>>>>>> >>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not be >>>>>>> addressed by rebalance delay still, hence still voted KIP-345 (some >>>>> more >>>>>>> details can be found on the discussion thread of KIP-345 itself). One >>>>>>> example is that as the instance resumes, its member id will be empty >>>>> so >>>>>> we >>>>>>> are still relying on assignor to give it the assignment from the old >>>>>>> member-id while keeping all other member's assignment unchanged. >>>>>>> >>>>>>> 40). Incomplete sentence, I've updated it. >>>>>>> >>>>>>> 50). Here's my idea: suppose we augment the join group schema with >>>>>>> `protocol version` in 2.3, and then with both brokers and clients >>>>> being >>>>>> in >>>>>>> version 2.3+, on the first rolling bounce where subscription and >>>>>> assignment >>>>>>> schema and / or user metadata has changed, this protocol version will >>>>> be >>>>>>> bumped. On the broker side, when receiving all member's join-group >>>>>> request, >>>>>>> it will choose the one that has the highest protocol version (also it >>>>>>> assumes higher versioned protocol is always backward compatible, i.e. >>>>> the >>>>>>> coordinator can recognize lower versioned protocol as well) and >>>>> select it >>>>>>> as the leader. Then the leader can decide, based on its received and >>>>>>> deserialized subscription information, how to assign partitions and >>>>> how >>>>>> to >>>>>>> encode the assignment accordingly so that everyone can understand it. >>>>>> With >>>>>>> this, in Streams for example, no version probing would be needed >>>>> since we >>>>>>> are guaranteed the leader knows everyone's version -- again it is >>>>>> assuming >>>>>>> that higher versioned protocol is always backward compatible -- and >>>>> hence >>>>>>> can successfully do the assignment at that round. >>>>>>> >>>>>>> 60). My bad, this section was not updated while the design was >>>>> evolved, >>>>>>> I've updated it. >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> >>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay is >>>>>> that >>>>>>>> it is a rather fixed trade-off between >>>>>>>> >>>>>>>> task availability and resource shuffling. If we eventually trigger >>>>>>>> rebalance after rolling bounce, certain consumer >>>>>>>> >>>>>>>> setup is still faced with global shuffles, for example member.id >>>>>> ranking >>>>>>>> based round robin strategy, as rejoining dynamic >>>>>>>> >>>>>>>> members will be assigned with new member.id which reorders the >>>>>>>> assignment. So I think the primary goal of incremental >>>>>>>> >>>>>>>> rebalancing is still improving the cluster availability during >>>>>> rebalance, >>>>>>>> because it didn't revoke any partition during this >>>>>>>> >>>>>>>> process. Also, the perk is minimum configuration requirement :) >>>>>>>> >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Boyang >>>>>>>> >>>>>>>> ________________________________ >>>>>>>> From: Matthias J. Sax <matth...@confluent.io> >>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM >>>>>>>> To: dev >>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >>>>> Streams >>>>>>>> >>>>>>>> Thank for the KIP, Boyang and Guozhang! >>>>>>>> >>>>>>>> >>>>>>>> I made an initial pass and have some questions/comments. One high >>>>> level >>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka >>>>> Streams >>>>>>>> use case a little bit (at least in the presentation). It might be >>>>>>>> helpful to separate both cases clearly, or maybe limit the scope to >>>>>>>> plain consumer only. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new >>>>> method >>>>>>>> `List<TopicPartitions> revokedPartitions()` ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 20) In Section "Consumer Coordinator Algorithm" >>>>>>>> >>>>>>>> Bullet point "1a)": If the subscription changes and a topic is >>>>>>>> removed from the subscription, why do we not revoke the partitions? >>>>>>>> >>>>>>>> Bullet point "1a)": What happens is a topic is deleted (or a >>>>>>>> partition is removed/deleted from a topic)? Should we call the new >>>>>>>> `onPartitionsEmigrated()` callback for this case? >>>>>>>> >>>>>>>> Bullet point "2b)" Should we update the `PartitionAssignor` >>>>>>>> interface to pass in the "old assignment" as third parameter into >>>>>>>> `assign()`? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay >>>>>>>> subsume KIP-345? Configuring static members is rather complicated, >>>>> and >>>>>> I >>>>>>>> am wondering if a rebalance delay would be sufficient? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) forever." >>>>>>>> >>>>>>>> What is "case 3.b" ? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 50) Section "Looking into the Future" >>>>>>>> >>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first line >>>>>>>> describing "JoinGroupRequest" >>>>>>>> >>>>>>>>> This can also help saving "version probing" cost on Streams as >>>>> well. >>>>>>>> >>>>>>>> How does this relate to Kafka Streams "version probing" >>>>> implementation? >>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve >>>>>>>> "version probing" ? I have a rough idea, but would like to hear more >>>>>>>> details. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 60) Section "Recommended Upgrade Procedure" >>>>>>>> >>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force >>>>>> the >>>>>>>> stream application to stay with protocol type "consumer". >>>>>>>> >>>>>>>> This config is not discussed in the KIP and appears in this section >>>>>>>> without context. Can you elaborate about it? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote: >>>>>>>>> Bump up on this discussion thread. I've added a few new drawings >>>>> for >>>>>>>> better >>>>>>>>> illustration, would really appreciate your feedbacks. >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com >>>>>> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello Boyang, >>>>>>>>>> >>>>>>>>>> I've made another thorough pass over this KIP and I'd like to >>>>> spilt >>>>>> it >>>>>>>>>> into two parts: the first part, covered in KIP-429 would be >>>>> touching >>>>>>> on >>>>>>>>>> Consumer Coordinator only to have incremental rebalance protocol >>>>> in >>>>>>>> place. >>>>>>>>>> The second part (for now I've reserved KIP number 444 for it) >>>>> would >>>>>>>> contain >>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up >>>>> new >>>>>>>>>> members. >>>>>>>>>> >>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is >>>>> ready >>>>>>> for >>>>>>>>>> review and discussions again. Would love to hear people's >>>>> feedbacks >>>>>>> and >>>>>>>>>> ideas. >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com >>>>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined: >>>>>>>>>>> >>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >>>>>>> "learner >>>>>>>>>>> task" in addition to "standby task": if the only difference is >>>>> that >>>>>>> for >>>>>>>>>>> the >>>>>>>>>>> latter, we would consider workload balance while for the former >>>>> we >>>>>>>> would >>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor >>>>> a >>>>>> bit >>>>>>>> to >>>>>>>>>>> break that difference. Adding a new type of task would be >>>>> adding a >>>>>>> lot >>>>>>>> of >>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a >>>>>>>> standby-task >>>>>>>>>>> I would prefer to do so. >>>>>>>>>>> In the proposal we stated that we are not adding a new type of >>>>> task >>>>>>>>>>> implementation. The >>>>>>>>>>> learner task shall share the same implementation with normal >>>>>> standby >>>>>>>>>>> task, only that we >>>>>>>>>>> shall tag the standby task with learner and prioritize the >>>>> learner >>>>>>>> tasks >>>>>>>>>>> replay effort. >>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is >>>>>> which >>>>>>>>>>> layer >>>>>>>>>>> would the logic be implemented at. Although for most KIPs we >>>>> would >>>>>>> not >>>>>>>>>>> require internal implementation details but only public facing >>>>> API >>>>>>>>>>> updates, >>>>>>>>>>> for a KIP like this I think it still requires to flesh out >>>>> details >>>>>> on >>>>>>>> the >>>>>>>>>>> implementation design. More specifically: today Streams embed a >>>>>> full >>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >>>>>>> inside, >>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable >>>>>>>>>>> PartitionAssignor interface and inside the >>>>> StreamsPartitionAssignor >>>>>>> we >>>>>>>>>>> also >>>>>>>>>>> have a TaskAssignor interface whose default implementation is >>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today >>>>>> sites >>>>>>>> in >>>>>>>>>>> the latter two classes. Hence the hierarchy today is: >>>>>>>>>>> >>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> >>>>> StreamsPartitionAssignor -> >>>>>>>>>>> StickyTaskAssignor. >>>>>>>>>>> >>>>>>>>>>> We need to think about where the proposed implementation would >>>>> take >>>>>>>> place >>>>>>>>>>> at, and personally I think it is not the best option to inject >>>>> all >>>>>> of >>>>>>>> them >>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the >>>>>>> logic >>>>>>>> of >>>>>>>>>>> "triggering another rebalance" etc would require some >>>>> coordinator >>>>>>> logic >>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other >>>>>> hand, >>>>>>>>>>> since >>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot >>>>> just >>>>>>>> replace >>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >>>>>>> Connect >>>>>>>>>>> does >>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in >>>>> both >>>>>>>>>>> consumer layer and streams-assignor layer like we did in >>>>>>>> KIP-98/KIP-129. >>>>>>>>>>> And then the key thing to consider is how to cut off the >>>>> boundary >>>>>> so >>>>>>>> that >>>>>>>>>>> the modifications we push to ConsumerCoordinator would be >>>>>> beneficial >>>>>>>>>>> universally for any consumers, while keep the Streams-specific >>>>>> logic >>>>>>> at >>>>>>>>>>> the >>>>>>>>>>> assignor level. >>>>>>>>>>> Yes, that's also my ideal plan. The details for the >>>>> implementation >>>>>>> are >>>>>>>>>>> depicted >>>>>>>>>>> in this doc< >>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>>>> , >>>>>>>> [ >>>>>>>> >>>>>>> >>>>>> >>>>> >> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p >>>>>>>> ]< >>>>>>>> >>>>>>> >>>>>> >>>>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>>>> >>>>>>>> >>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan< >>>>>>>> >>>>>>> >>>>>> >>>>> >> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>>>> >>>>>>>> docs.google.com >>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang >>>>>> Chen, >>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] >>>>> Background We >>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing work >>>>> for >>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort >>>>> that >>>>>>> needs >>>>>>>> to... >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>>>> and I have explained the reasoning on why we want to push a >>>>>>>>>>> global change of replacing ConsumerCoordinator with >>>>>>> StreamCoordinator. >>>>>>>>>>> The motivation >>>>>>>>>>> is that KIP space is usually used for public & algorithm level >>>>>>> change, >>>>>>>>>>> not for internal >>>>>>>>>>> implementation details. >>>>>>>>>>> >>>>>>>>>>> 3. Depending on which design direction we choose, our migration >>>>>> plan >>>>>>>> would >>>>>>>>>>> also be quite different. For example, if we stay with >>>>>>>> ConsumerCoordinator >>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to >>>>> make >>>>>>> all >>>>>>>>>>> changes agnostic to brokers as well as to old versioned >>>>> consumers, >>>>>>> then >>>>>>>>>>> our >>>>>>>>>>> migration plan could be much easier. >>>>>>>>>>> Yes, the upgrade plan was designed to take the new >>>>>> StreamCoordinator >>>>>>>>>>> approach >>>>>>>>>>> which means we shall define a new protocol type. For existing >>>>>>>> application >>>>>>>>>>> we could only >>>>>>>>>>> maintain the same `consumer` protocol type is because current >>>>>> broker >>>>>>>> only >>>>>>>>>>> allows >>>>>>>>>>> change of protocol type when the consumer group is empty. It is >>>>> of >>>>>>>> course >>>>>>>>>>> user-unfriendly to force >>>>>>>>>>> a wipe-out for the entire application, and I don't think >>>>>> maintaining >>>>>>>> old >>>>>>>>>>> protocol type would greatly >>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT? >>>>>>>>>>> >>>>>>>>>>> 4. I think one major issue related to this KIP is that today, in >>>>>> the >>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>>>>>>> workload >>>>>>>>>>> balance, and hence "learner task" is needed to break this >>>>> priority, >>>>>>> but >>>>>>>>>>> I'm >>>>>>>>>>> wondering if we can have a better solution within sticky task >>>>>>> assignor >>>>>>>>>>> that >>>>>>>>>>> accommodate this? >>>>>>>>>>> Great question! That's what I explained in the proposal, which >>>>> is >>>>>>> that >>>>>>>> we >>>>>>>>>>> should breakdown our >>>>>>>>>>> delivery into different stages. At very beginning, our goal is >>>>> to >>>>>>>> trigger >>>>>>>>>>> learner task assignment only on >>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of >>>>> previous >>>>>>>> round >>>>>>>>>>> of rebalance to figure out. After >>>>>>>>>>> stage one, our goal is to have a smooth scaling up experience, >>>>> but >>>>>>> the >>>>>>>>>>> task balance problem is kind of orthogonal. >>>>>>>>>>> The load balance problem is a much broader topic than auto >>>>> scaling, >>>>>>>> which >>>>>>>>>>> I figure worth discussing within >>>>>>>>>>> this KIP's context since it's a naturally next-step, but >>>>> wouldn't >>>>>> be >>>>>>>> the >>>>>>>>>>> main topic. >>>>>>>>>>> Learner task or auto scaling support should be treated as `a >>>>>> helpful >>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm defining >>>>>> load >>>>>>>>>>> balance`. It would be great if you could share some insights of >>>>> the >>>>>>>> stream >>>>>>>>>>> task balance, which eventually helps us to break out of the >>>>>> KIP-429's >>>>>>>> scope >>>>>>>>>>> and even define a separate KIP to focus on task weight & >>>>> assignment >>>>>>>> logic >>>>>>>>>>> improvement. >>>>>>>>>>> >>>>>>>>>>> Also thank you for making improvement on the KIP context and >>>>>>>> organization! >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Boyang >>>>>>>>>>> ________________________________ >>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM >>>>>>>>>>> To: dev >>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >>>>>>> Streams >>>>>>>>>>> >>>>>>>>>>> Hello Boyang, >>>>>>>>>>> >>>>>>>>>>> I've just made a quick pass on the KIP and here are some >>>>> thoughts. >>>>>>>>>>> >>>>>>>>>>> Meta: >>>>>>>>>>> >>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >>>>>>> "learner >>>>>>>>>>> task" in addition to "standby task": if the only difference is >>>>> that >>>>>>> for >>>>>>>>>>> the >>>>>>>>>>> latter, we would consider workload balance while for the former >>>>> we >>>>>>>> would >>>>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor >>>>> a >>>>>> bit >>>>>>>> to >>>>>>>>>>> break that difference. Adding a new type of task would be >>>>> adding a >>>>>>> lot >>>>>>>> of >>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a >>>>>>>> standby-task >>>>>>>>>>> I would prefer to do so. >>>>>>>>>>> >>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is >>>>>> which >>>>>>>>>>> layer >>>>>>>>>>> would the logic be implemented at. Although for most KIPs we >>>>> would >>>>>>> not >>>>>>>>>>> require internal implementation details but only public facing >>>>> API >>>>>>>>>>> updates, >>>>>>>>>>> for a KIP like this I think it still requires to flesh out >>>>> details >>>>>> on >>>>>>>> the >>>>>>>>>>> implementation design. More specifically: today Streams embed a >>>>>> full >>>>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >>>>>>> inside, >>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable >>>>>>>>>>> PartitionAssignor interface and inside the >>>>> StreamsPartitionAssignor >>>>>>> we >>>>>>>>>>> also >>>>>>>>>>> have a TaskAssignor interface whose default implementation is >>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today >>>>>> sites >>>>>>>> in >>>>>>>>>>> the latter two classes. Hence the hierarchy today is: >>>>>>>>>>> >>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> >>>>> StreamsPartitionAssignor -> >>>>>>>>>>> StickyTaskAssignor. >>>>>>>>>>> >>>>>>>>>>> We need to think about where the proposed implementation would >>>>> take >>>>>>>> place >>>>>>>>>>> at, and personally I think it is not the best option to inject >>>>> all >>>>>> of >>>>>>>> them >>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the >>>>>>> logic >>>>>>>> of >>>>>>>>>>> "triggering another rebalance" etc would require some >>>>> coordinator >>>>>>> logic >>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other >>>>>> hand, >>>>>>>>>>> since >>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot >>>>> just >>>>>>>> replace >>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >>>>>>> Connect >>>>>>>>>>> does >>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in >>>>> both >>>>>>>>>>> consumer layer and streams-assignor layer like we did in >>>>>>>> KIP-98/KIP-129. >>>>>>>>>>> And then the key thing to consider is how to cut off the >>>>> boundary >>>>>> so >>>>>>>> that >>>>>>>>>>> the modifications we push to ConsumerCoordinator would be >>>>>> beneficial >>>>>>>>>>> universally for any consumers, while keep the Streams-specific >>>>>> logic >>>>>>> at >>>>>>>>>>> the >>>>>>>>>>> assignor level. >>>>>>>>>>> >>>>>>>>>>> 3. Depending on which design direction we choose, our migration >>>>>> plan >>>>>>>> would >>>>>>>>>>> also be quite different. For example, if we stay with >>>>>>>> ConsumerCoordinator >>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to >>>>> make >>>>>>> all >>>>>>>>>>> changes agnostic to brokers as well as to old versioned >>>>> consumers, >>>>>>> then >>>>>>>>>>> our >>>>>>>>>>> migration plan could be much easier. >>>>>>>>>>> >>>>>>>>>>> 4. I think one major issue related to this KIP is that today, in >>>>>> the >>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>>>>>>> workload >>>>>>>>>>> balance, and hence "learner task" is needed to break this >>>>> priority, >>>>>>> but >>>>>>>>>>> I'm >>>>>>>>>>> wondering if we can have a better solution within sticky task >>>>>>> assignor >>>>>>>>>>> that >>>>>>>>>>> accommodate this? >>>>>>>>>>> >>>>>>>>>>> Minor: >>>>>>>>>>> >>>>>>>>>>> 1. The idea of two rebalances have also been discussed in >>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should >>>>> add >>>>>>> the >>>>>>>>>>> reference on the wiki page as well. >>>>>>>>>>> 2. Could you also add a section describing how the subscription >>>>> / >>>>>>>>>>> assignment metadata will be re-formatted? Without this >>>>> information >>>>>> it >>>>>>>> is >>>>>>>>>>> hard to get to the bottom of your idea. For example in the >>>>> "Leader >>>>>>>>>>> Transfer >>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is >>>>>> new >>>>>>>>>>> member" >>>>>>>>>>> and hence would blindly obey stickiness over workload balance >>>>>>>> requirement. >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen < >>>>> bche...@outlook.com> >>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hey community friends, >>>>>>>>>>>> >>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add >>>>>>>>>>> incremental >>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams >>>>>>>>>>>> >>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and >>>>>> important >>>>>>>>>>>> feedbacks while making this KIP! >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Boyang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature