Thanks Guozhang! The simplified upgrade path is great!
Just a clarification question about the "Rebalance Callback Error Handling" -- does this change affect the `ConsumerCoordinator` only if incremental rebalancing is use? Or does the behavior also change for the eager rebalancing case? -Matthias On 5/9/19 3:37 AM, Guozhang Wang wrote: > Hello all, > > Thanks for everyone who've shared their feedbacks for this KIP! If there's > no further comments I'll start the voting thread by end of tomorrow. > > > Guozhang. > > On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Boyang, >> >> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com> wrote: >> >>> Hey Guozhang, >>> >>> thank you for the great write up. Overall the motivation and changes >>> LGTM, just some minor comments: >>> >>> >>> 1. In "Consumer Coordinator Algorithm", we could reorder alphabet >>> points for 3d~3f from ["ready-to-migrate-partitions", >>> "unknown-but-owned-partitions", "maybe-revoking-partitions"] to >>> ["maybe-revoking-partitions", "ready-to-migrate-partitions", >>> "unknown-but-owned-partitions"] in order to be consistent with 3c1~3. >>> >> >> Ack. Updated. >> >> >>> 2. In "Consumer Coordinator Algorithm", 1c suggests to revoke all >>> partition upon heartbeat/commit fail. What's the gain here? Do we want to >>> keep all partitions running at this moment, to be optimistic for the case >>> when no partitions get reassigned? >>> >> >> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can just >> re-join the group with all the currently owned partitions encoded. Updated. >> >> >>> 3. In "Recommended Upgrade Procedure", remove extra 'those': " The >>> 'sticky' assignor works even those there are " >>> >> >> Ack, should be `even when`. >> >> >>> 4. Put two "looking into the future" into a separate category from >>> migration session. It seems inconsistent for readers to see this before we >>> finished discussion for everything. >>> >> >> Ack. >> >> >>> 5. Have we discussed the concern on the serialization? Could the new >>> metadata we are adding grow larger than the message size cap? >>> >> >> We're completing https://issues.apache.org/jira/browse/KAFKA-7149 which >> should largely reduce the message size (will update the wiki accordingly as >> well). >> >> >>> >>> Boyang >>> >>> ________________________________ >>> From: Guozhang Wang <wangg...@gmail.com> >>> Sent: Monday, April 15, 2019 9:20 AM >>> To: dev >>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams >>> >>> Hello Jason, >>> >>> I agree with you that for range / round-robin it makes less sense to be >>> compatible with cooperative rebalance protocol. >>> >>> As for StickyAssignor, however, I think it would still be possible to make >>> the current implementation to be compatible with cooperative rebalance. So >>> after pondering on different options at hand I'm now proposing this >>> approach as listed in the upgrade section: >>> >>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath >>> >>> The idea is to let assignors specify which protocols it would work with, >>> associating with a different name; then the upgrade path would involve a >>> "compatible" protocol which actually still use eager behavior while >>> encoding two assignors if possible. In "Rejected Section" (just to clarify >>> I'm not finalizing it as rejected, just putting it there for now, and if >>> we >>> like this one instead we can always switch them) I listed the other >>> approach we once discussed about, and arguing its cons of duplicated class >>> seems overwhelm the pros of saving the "rebalance.protocol" config. >>> >>> Let me know WDYT. >>> >>> Guozhang >>> >>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <ja...@confluent.io> >>> wrote: >>> >>>> Hi Guozhang, >>>> >>>> Responses below: >>>> >>>> 2. The interface's default implementation will just be >>>>> `onPartitionRevoked`, so for user's instantiation if they do not make >>> any >>>>> code changes they should be able to recompile the code and continue. >>>> >>>> >>>> Ack, makes sense. >>>> >>>> 4. Hmm.. not sure if it will work. The main issue is that the >>>>> consumer-coordinator behavior (whether to revoke all or none at >>>>> onRebalancePrepare) is independent of the selected protocol's assignor >>>>> (eager or cooperative), so even if the assignor is selected to be the >>>>> old-versioned one, we will still not revoke at the >>> consumer-coordinator >>>>> layer and hence has the same risk of migrating still-owned partitions, >>>>> right? >>>> >>>> >>>> Yeah, basically we would have to push the eager/cooperative logic into >>> the >>>> PartitionAssignor itself and make the consumer aware of the rebalance >>>> protocol it is compatible with. As long as an eager protocol _could_ be >>>> selected, the consumer would have to be pessimistic and do eager >>>> revocation. But if all the assignors configured in the consumer support >>>> cooperative reassignment, then either 1) a cooperative protocol will be >>>> selected and cooperative revocation can be safely used, or 2) if the >>> rest >>>> of the group does not support it, then the consumer will simply fail. >>>> >>>> Another point which you raised offline and I will repeat here is that >>> this >>>> proposal's benefit is mostly limited to sticky assignment logic. >>> Arguably >>>> the range assignor may have some incidental stickiness, particularly if >>> the >>>> group is rebalancing for a newly created or deleted topic. For other >>> cases, >>>> the proposal is mostly additional overhead since it takes an additional >>>> rebalance and many of the partitions will move. Perhaps it doesn't make >>> as >>>> much sense to use the cooperative protocol for strategies like range and >>>> round-robin. That kind of argues in favor of pushing some of the control >>>> into the assignor itself. Maybe we would not bother creating >>>> CooperativeRange as I suggested above, but it would make sense to >>> create a >>>> cooperative version of the sticky assignment strategy. I thought we >>> might >>>> have to create a new sticky assignor anyway because I can't see how we >>>> would get compatible behavior mixing with the old version anyway. >>>> >>>> Thanks, >>>> Jason >>>> >>>> >>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>> Hello Matthias: >>>>> >>>>> Thanks for your review. >>>>> >>>>> The background section uses streams assignor as well as the consumer's >>>> own >>>>> stick assignor as examples illustrating the situation, but this KIP is >>>> for >>>>> consumer coordinator itself, and the rest of the paragraph did not >>> talk >>>>> about Streams any more. If you feel it's a bit distracted I can remove >>>>> those examples. >>>>> >>>>> 10). While working on the PR I realized that the revoked partitions on >>>>> assignment is not needed (this is being discussed on the PR itself: >>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890 >>>>> >>>>> 20). 1.a. Good question, I've updated the wiki to let the consumer's >>>>> cleanup assignment and re-join, and not letting assignor making any >>>>> proactive changes. The idea is to keep logic simpler and not doing any >>>>> "split brain" stuff. >>>>> >>>>> 20). 2.b. No we do not need, since the owned-partitions will be part >>> of >>>> the >>>>> Subscription passed in to assign() already. >>>>> >>>>> 30). As Boyang mentioned, there are some drawbacks that can not be >>>>> addressed by rebalance delay still, hence still voted KIP-345 (some >>> more >>>>> details can be found on the discussion thread of KIP-345 itself). One >>>>> example is that as the instance resumes, its member id will be empty >>> so >>>> we >>>>> are still relying on assignor to give it the assignment from the old >>>>> member-id while keeping all other member's assignment unchanged. >>>>> >>>>> 40). Incomplete sentence, I've updated it. >>>>> >>>>> 50). Here's my idea: suppose we augment the join group schema with >>>>> `protocol version` in 2.3, and then with both brokers and clients >>> being >>>> in >>>>> version 2.3+, on the first rolling bounce where subscription and >>>> assignment >>>>> schema and / or user metadata has changed, this protocol version will >>> be >>>>> bumped. On the broker side, when receiving all member's join-group >>>> request, >>>>> it will choose the one that has the highest protocol version (also it >>>>> assumes higher versioned protocol is always backward compatible, i.e. >>> the >>>>> coordinator can recognize lower versioned protocol as well) and >>> select it >>>>> as the leader. Then the leader can decide, based on its received and >>>>> deserialized subscription information, how to assign partitions and >>> how >>>> to >>>>> encode the assignment accordingly so that everyone can understand it. >>>> With >>>>> this, in Streams for example, no version probing would be needed >>> since we >>>>> are guaranteed the leader knows everyone's version -- again it is >>>> assuming >>>>> that higher versioned protocol is always backward compatible -- and >>> hence >>>>> can successfully do the assignment at that round. >>>>> >>>>> 60). My bad, this section was not updated while the design was >>> evolved, >>>>> I've updated it. >>>>> >>>>> >>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com> >>> wrote: >>>>> >>>>>> >>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay is >>>> that >>>>>> it is a rather fixed trade-off between >>>>>> >>>>>> task availability and resource shuffling. If we eventually trigger >>>>>> rebalance after rolling bounce, certain consumer >>>>>> >>>>>> setup is still faced with global shuffles, for example member.id >>>> ranking >>>>>> based round robin strategy, as rejoining dynamic >>>>>> >>>>>> members will be assigned with new member.id which reorders the >>>>>> assignment. So I think the primary goal of incremental >>>>>> >>>>>> rebalancing is still improving the cluster availability during >>>> rebalance, >>>>>> because it didn't revoke any partition during this >>>>>> >>>>>> process. Also, the perk is minimum configuration requirement :) >>>>>> >>>>>> >>>>>> Best, >>>>>> >>>>>> Boyang >>>>>> >>>>>> ________________________________ >>>>>> From: Matthias J. Sax <matth...@confluent.io> >>>>>> Sent: Tuesday, April 9, 2019 7:47 AM >>>>>> To: dev >>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >>> Streams >>>>>> >>>>>> Thank for the KIP, Boyang and Guozhang! >>>>>> >>>>>> >>>>>> I made an initial pass and have some questions/comments. One high >>> level >>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka >>> Streams >>>>>> use case a little bit (at least in the presentation). It might be >>>>>> helpful to separate both cases clearly, or maybe limit the scope to >>>>>> plain consumer only. >>>>>> >>>>>> >>>>>> >>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new >>> method >>>>>> `List<TopicPartitions> revokedPartitions()` ? >>>>>> >>>>>> >>>>>> >>>>>> 20) In Section "Consumer Coordinator Algorithm" >>>>>> >>>>>> Bullet point "1a)": If the subscription changes and a topic is >>>>>> removed from the subscription, why do we not revoke the partitions? >>>>>> >>>>>> Bullet point "1a)": What happens is a topic is deleted (or a >>>>>> partition is removed/deleted from a topic)? Should we call the new >>>>>> `onPartitionsEmigrated()` callback for this case? >>>>>> >>>>>> Bullet point "2b)" Should we update the `PartitionAssignor` >>>>>> interface to pass in the "old assignment" as third parameter into >>>>>> `assign()`? >>>>>> >>>>>> >>>>>> >>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance delay >>>>>> subsume KIP-345? Configuring static members is rather complicated, >>> and >>>> I >>>>>> am wondering if a rebalance delay would be sufficient? >>>>>> >>>>>> >>>>>> >>>>>> 40) Quote: "otherwise the we would fall into the case 3.b) forever." >>>>>> >>>>>> What is "case 3.b" ? >>>>>> >>>>>> >>>>>> >>>>>> 50) Section "Looking into the Future" >>>>>> >>>>>> Nit: the new "ProtocolVersion" field is missing in the first line >>>>>> describing "JoinGroupRequest" >>>>>> >>>>>>> This can also help saving "version probing" cost on Streams as >>> well. >>>>>> >>>>>> How does this relate to Kafka Streams "version probing" >>> implementation? >>>>>> How can we exploit the new `ProtocolVersion` in Streams to improve >>>>>> "version probing" ? I have a rough idea, but would like to hear more >>>>>> details. >>>>>> >>>>>> >>>>>> >>>>>> 60) Section "Recommended Upgrade Procedure" >>>>>> >>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will force >>>> the >>>>>> stream application to stay with protocol type "consumer". >>>>>> >>>>>> This config is not discussed in the KIP and appears in this section >>>>>> without context. Can you elaborate about it? >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote: >>>>>>> Bump up on this discussion thread. I've added a few new drawings >>> for >>>>>> better >>>>>>> illustration, would really appreciate your feedbacks. >>>>>>> >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com >>>> >>>>>> wrote: >>>>>>> >>>>>>>> Hello Boyang, >>>>>>>> >>>>>>>> I've made another thorough pass over this KIP and I'd like to >>> spilt >>>> it >>>>>>>> into two parts: the first part, covered in KIP-429 would be >>> touching >>>>> on >>>>>>>> Consumer Coordinator only to have incremental rebalance protocol >>> in >>>>>> place. >>>>>>>> The second part (for now I've reserved KIP number 444 for it) >>> would >>>>>> contain >>>>>>>> all the changes on StreamsPartitionAssginor to allow warming up >>> new >>>>>>>> members. >>>>>>>> >>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is >>> ready >>>>> for >>>>>>>> review and discussions again. Would love to hear people's >>> feedbacks >>>>> and >>>>>>>> ideas. >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com >>>> >>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined: >>>>>>>>> >>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >>>>> "learner >>>>>>>>> task" in addition to "standby task": if the only difference is >>> that >>>>> for >>>>>>>>> the >>>>>>>>> latter, we would consider workload balance while for the former >>> we >>>>>> would >>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor >>> a >>>> bit >>>>>> to >>>>>>>>> break that difference. Adding a new type of task would be >>> adding a >>>>> lot >>>>>> of >>>>>>>>> code complexity, so if we can still piggy-back the logic on a >>>>>> standby-task >>>>>>>>> I would prefer to do so. >>>>>>>>> In the proposal we stated that we are not adding a new type of >>> task >>>>>>>>> implementation. The >>>>>>>>> learner task shall share the same implementation with normal >>>> standby >>>>>>>>> task, only that we >>>>>>>>> shall tag the standby task with learner and prioritize the >>> learner >>>>>> tasks >>>>>>>>> replay effort. >>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is >>>> which >>>>>>>>> layer >>>>>>>>> would the logic be implemented at. Although for most KIPs we >>> would >>>>> not >>>>>>>>> require internal implementation details but only public facing >>> API >>>>>>>>> updates, >>>>>>>>> for a KIP like this I think it still requires to flesh out >>> details >>>> on >>>>>> the >>>>>>>>> implementation design. More specifically: today Streams embed a >>>> full >>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >>>>> inside, >>>>>>>>> Streams then injects a StreamsPartitionAssignor to its pluggable >>>>>>>>> PartitionAssignor interface and inside the >>> StreamsPartitionAssignor >>>>> we >>>>>>>>> also >>>>>>>>> have a TaskAssignor interface whose default implementation is >>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today >>>> sites >>>>>> in >>>>>>>>> the latter two classes. Hence the hierarchy today is: >>>>>>>>> >>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> >>> StreamsPartitionAssignor -> >>>>>>>>> StickyTaskAssignor. >>>>>>>>> >>>>>>>>> We need to think about where the proposed implementation would >>> take >>>>>> place >>>>>>>>> at, and personally I think it is not the best option to inject >>> all >>>> of >>>>>> them >>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the >>>>> logic >>>>>> of >>>>>>>>> "triggering another rebalance" etc would require some >>> coordinator >>>>> logic >>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other >>>> hand, >>>>>>>>> since >>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot >>> just >>>>>> replace >>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >>>>> Connect >>>>>>>>> does >>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in >>> both >>>>>>>>> consumer layer and streams-assignor layer like we did in >>>>>> KIP-98/KIP-129. >>>>>>>>> And then the key thing to consider is how to cut off the >>> boundary >>>> so >>>>>> that >>>>>>>>> the modifications we push to ConsumerCoordinator would be >>>> beneficial >>>>>>>>> universally for any consumers, while keep the Streams-specific >>>> logic >>>>> at >>>>>>>>> the >>>>>>>>> assignor level. >>>>>>>>> Yes, that's also my ideal plan. The details for the >>> implementation >>>>> are >>>>>>>>> depicted >>>>>>>>> in this doc< >>>>>>>>> >>>>>> >>>>> >>>> >>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>> , >>>>>> [ >>>>>> >>>>> >>>> >>> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p >>>>>> ]< >>>>>> >>>>> >>>> >>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>> >>>>>> >>>>>> [External] KStream Smooth Auto-scaling Implementation Plan< >>>>>> >>>>> >>>> >>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae >>>>>>> >>>>>> docs.google.com >>>>>> KStream Incremental Rebalancing Implementation Plan Authors: Boyang >>>> Chen, >>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved] >>> Background We >>>>>> initiated KIP-429 for the promotion of incremental rebalancing work >>> for >>>>>> KStream. Behind the scene, there is non-trivial amount of effort >>> that >>>>> needs >>>>>> to... >>>>>> >>>>>> >>>>>> >>>>>>>>> and I have explained the reasoning on why we want to push a >>>>>>>>> global change of replacing ConsumerCoordinator with >>>>> StreamCoordinator. >>>>>>>>> The motivation >>>>>>>>> is that KIP space is usually used for public & algorithm level >>>>> change, >>>>>>>>> not for internal >>>>>>>>> implementation details. >>>>>>>>> >>>>>>>>> 3. Depending on which design direction we choose, our migration >>>> plan >>>>>> would >>>>>>>>> also be quite different. For example, if we stay with >>>>>> ConsumerCoordinator >>>>>>>>> whose protocol type is "consumer" still, and we can manage to >>> make >>>>> all >>>>>>>>> changes agnostic to brokers as well as to old versioned >>> consumers, >>>>> then >>>>>>>>> our >>>>>>>>> migration plan could be much easier. >>>>>>>>> Yes, the upgrade plan was designed to take the new >>>> StreamCoordinator >>>>>>>>> approach >>>>>>>>> which means we shall define a new protocol type. For existing >>>>>> application >>>>>>>>> we could only >>>>>>>>> maintain the same `consumer` protocol type is because current >>>> broker >>>>>> only >>>>>>>>> allows >>>>>>>>> change of protocol type when the consumer group is empty. It is >>> of >>>>>> course >>>>>>>>> user-unfriendly to force >>>>>>>>> a wipe-out for the entire application, and I don't think >>>> maintaining >>>>>> old >>>>>>>>> protocol type would greatly >>>>>>>>> impact ongoing services using new stream coordinator. WDYT? >>>>>>>>> >>>>>>>>> 4. I think one major issue related to this KIP is that today, in >>>> the >>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>>>>> workload >>>>>>>>> balance, and hence "learner task" is needed to break this >>> priority, >>>>> but >>>>>>>>> I'm >>>>>>>>> wondering if we can have a better solution within sticky task >>>>> assignor >>>>>>>>> that >>>>>>>>> accommodate this? >>>>>>>>> Great question! That's what I explained in the proposal, which >>> is >>>>> that >>>>>> we >>>>>>>>> should breakdown our >>>>>>>>> delivery into different stages. At very beginning, our goal is >>> to >>>>>> trigger >>>>>>>>> learner task assignment only on >>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of >>> previous >>>>>> round >>>>>>>>> of rebalance to figure out. After >>>>>>>>> stage one, our goal is to have a smooth scaling up experience, >>> but >>>>> the >>>>>>>>> task balance problem is kind of orthogonal. >>>>>>>>> The load balance problem is a much broader topic than auto >>> scaling, >>>>>> which >>>>>>>>> I figure worth discussing within >>>>>>>>> this KIP's context since it's a naturally next-step, but >>> wouldn't >>>> be >>>>>> the >>>>>>>>> main topic. >>>>>>>>> Learner task or auto scaling support should be treated as `a >>>> helpful >>>>>>>>> mechanism to reach load balance`, but not `an algorithm defining >>>> load >>>>>>>>> balance`. It would be great if you could share some insights of >>> the >>>>>> stream >>>>>>>>> task balance, which eventually helps us to break out of the >>>> KIP-429's >>>>>> scope >>>>>>>>> and even define a separate KIP to focus on task weight & >>> assignment >>>>>> logic >>>>>>>>> improvement. >>>>>>>>> >>>>>>>>> Also thank you for making improvement on the KIP context and >>>>>> organization! >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Boyang >>>>>>>>> ________________________________ >>>>>>>>> From: Guozhang Wang <wangg...@gmail.com> >>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM >>>>>>>>> To: dev >>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka >>>>> Streams >>>>>>>>> >>>>>>>>> Hello Boyang, >>>>>>>>> >>>>>>>>> I've just made a quick pass on the KIP and here are some >>> thoughts. >>>>>>>>> >>>>>>>>> Meta: >>>>>>>>> >>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of >>>>> "learner >>>>>>>>> task" in addition to "standby task": if the only difference is >>> that >>>>> for >>>>>>>>> the >>>>>>>>> latter, we would consider workload balance while for the former >>> we >>>>>> would >>>>>>>>> not, I think we can just adjust the logic of StickyTaskAssignor >>> a >>>> bit >>>>>> to >>>>>>>>> break that difference. Adding a new type of task would be >>> adding a >>>>> lot >>>>>> of >>>>>>>>> code complexity, so if we can still piggy-back the logic on a >>>>>> standby-task >>>>>>>>> I would prefer to do so. >>>>>>>>> >>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself is >>>> which >>>>>>>>> layer >>>>>>>>> would the logic be implemented at. Although for most KIPs we >>> would >>>>> not >>>>>>>>> require internal implementation details but only public facing >>> API >>>>>>>>> updates, >>>>>>>>> for a KIP like this I think it still requires to flesh out >>> details >>>> on >>>>>> the >>>>>>>>> implementation design. More specifically: today Streams embed a >>>> full >>>>>>>>> fledged Consumer client, which hard-code a ConsumerCoordinator >>>>> inside, >>>>>>>>> Streams then injects a StreamsPartitionAssignor to its plugable >>>>>>>>> PartitionAssignor interface and inside the >>> StreamsPartitionAssignor >>>>> we >>>>>>>>> also >>>>>>>>> have a TaskAssignor interface whose default implementation is >>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic today >>>> sites >>>>>> in >>>>>>>>> the latter two classes. Hence the hierarchy today is: >>>>>>>>> >>>>>>>>> KafkaConsumer -> ConsumerCoordinator -> >>> StreamsPartitionAssignor -> >>>>>>>>> StickyTaskAssignor. >>>>>>>>> >>>>>>>>> We need to think about where the proposed implementation would >>> take >>>>>> place >>>>>>>>> at, and personally I think it is not the best option to inject >>> all >>>> of >>>>>> them >>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the >>>>> logic >>>>>> of >>>>>>>>> "triggering another rebalance" etc would require some >>> coordinator >>>>> logic >>>>>>>>> which is hard to mimic at PartitionAssignor level. On the other >>>> hand, >>>>>>>>> since >>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot >>> just >>>>>> replace >>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator like >>>>> Connect >>>>>>>>> does >>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal in >>> both >>>>>>>>> consumer layer and streams-assignor layer like we did in >>>>>> KIP-98/KIP-129. >>>>>>>>> And then the key thing to consider is how to cut off the >>> boundary >>>> so >>>>>> that >>>>>>>>> the modifications we push to ConsumerCoordinator would be >>>> beneficial >>>>>>>>> universally for any consumers, while keep the Streams-specific >>>> logic >>>>> at >>>>>>>>> the >>>>>>>>> assignor level. >>>>>>>>> >>>>>>>>> 3. Depending on which design direction we choose, our migration >>>> plan >>>>>> would >>>>>>>>> also be quite different. For example, if we stay with >>>>>> ConsumerCoordinator >>>>>>>>> whose protocol type is "consumer" still, and we can manage to >>> make >>>>> all >>>>>>>>> changes agnostic to brokers as well as to old versioned >>> consumers, >>>>> then >>>>>>>>> our >>>>>>>>> migration plan could be much easier. >>>>>>>>> >>>>>>>>> 4. I think one major issue related to this KIP is that today, in >>>> the >>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness over >>>>>> workload >>>>>>>>> balance, and hence "learner task" is needed to break this >>> priority, >>>>> but >>>>>>>>> I'm >>>>>>>>> wondering if we can have a better solution within sticky task >>>>> assignor >>>>>>>>> that >>>>>>>>> accommodate this? >>>>>>>>> >>>>>>>>> Minor: >>>>>>>>> >>>>>>>>> 1. The idea of two rebalances have also been discussed in >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should >>> add >>>>> the >>>>>>>>> reference on the wiki page as well. >>>>>>>>> 2. Could you also add a section describing how the subscription >>> / >>>>>>>>> assignment metadata will be re-formatted? Without this >>> information >>>> it >>>>>> is >>>>>>>>> hard to get to the bottom of your idea. For example in the >>> "Leader >>>>>>>>> Transfer >>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is >>>> new >>>>>>>>> member" >>>>>>>>> and hence would blindly obey stickiness over workload balance >>>>>> requirement. >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen < >>> bche...@outlook.com> >>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hey community friends, >>>>>>>>>> >>>>>>>>>> I'm gladly inviting you to have a look at the proposal to add >>>>>>>>> incremental >>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>> >>>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams >>>>>>>>>> >>>>>>>>>> Special thanks to Guozhang for giving great guidances and >>>> important >>>>>>>>>> feedbacks while making this KIP! >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Boyang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >> -- >> -- Guozhang >> > >
signature.asc
Description: OpenPGP digital signature