Thank for the KIP, Boyang and Guozhang!
I made an initial pass and have some questions/comments. One high level comment: it seems that the KIP "mixes" plain consumer and Kafka Streams use case a little bit (at least in the presentation). It might be helpful to separate both cases clearly, or maybe limit the scope to plain consumer only. 10) For `PartitionAssignor.Assignment`: It seems we need a new method `List<TopicPartitions> revokedPartitions()` ? 20) In Section "Consumer Coordinator Algorithm" Bullet point "1a)": If the subscription changes and a topic is removed from the subscription, why do we not revoke the partitions? Bullet point "1a)": What happens is a topic is deleted (or a partition is removed/deleted from a topic)? Should we call the new `onPartitionsEmigrated()` callback for this case? Bullet point "2b)" Should we update the `PartitionAssignor` interface to pass in the "old assignment" as third parameter into `assign()`? 30) Rebalance delay (as used in KIP-415): Could a rebalance delay subsume KIP-345? Configuring static members is rather complicated, and I am wondering if a rebalance delay would be sufficient? 40) Quote: "otherwise the we would fall into the case 3.b) forever." What is "case 3.b" ? 50) Section "Looking into the Future" Nit: the new "ProtocolVersion" field is missing in the first line describing "JoinGroupRequest" > This can also help saving "version probing" cost on Streams as well. How does this relate to Kafka Streams "version probing" implementation? How can we exploit the new `ProtocolVersion` in Streams to improve "version probing" ? I have a rough idea, but would like to hear more details. 60) Section "Recommended Upgrade Procedure" > Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream > application to stay with protocol type "consumer". This config is not discussed in the KIP and appears in this section without context. Can you elaborate about it? -Matthias On 3/29/19 6:20 PM, Guozhang Wang wrote: > Bump up on this discussion thread. I've added a few new drawings for better > illustration, would really appreciate your feedbacks. > > > Guozhang > > On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Boyang, >> >> I've made another thorough pass over this KIP and I'd like to spilt it >> into two parts: the first part, covered in KIP-429 would be touching on >> Consumer Coordinator only to have incremental rebalance protocol in place. >> The second part (for now I've reserved KIP number 444 for it) would contain >> all the changes on StreamsPartitionAssginor to allow warming up new >> members. >> >> I think the first part, a.k.a. the current updated KIP-429 is ready for >> review and discussions again. Would love to hear people's feedbacks and >> ideas. >> >> Guozhang >> >> >> >> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com> wrote: >> >>> Thanks Guozhang for the great questions. Answers are inlined: >>> >>> 1. I'm still not sure if it's worthwhile to add a new type of "learner >>> task" in addition to "standby task": if the only difference is that for >>> the >>> latter, we would consider workload balance while for the former we would >>> not, I think we can just adjust the logic of StickyTaskAssignor a bit to >>> break that difference. Adding a new type of task would be adding a lot of >>> code complexity, so if we can still piggy-back the logic on a standby-task >>> I would prefer to do so. >>> In the proposal we stated that we are not adding a new type of task >>> implementation. The >>> learner task shall share the same implementation with normal standby >>> task, only that we >>> shall tag the standby task with learner and prioritize the learner tasks >>> replay effort. >>> 2. One thing that's still not clear from the KIP wiki itself is which >>> layer >>> would the logic be implemented at. Although for most KIPs we would not >>> require internal implementation details but only public facing API >>> updates, >>> for a KIP like this I think it still requires to flesh out details on the >>> implementation design. More specifically: today Streams embed a full >>> fledged Consumer client, which hard-code a ConsumerCoordinator inside, >>> Streams then injects a StreamsPartitionAssignor to its pluggable >>> PartitionAssignor interface and inside the StreamsPartitionAssignor we >>> also >>> have a TaskAssignor interface whose default implementation is >>> StickyPartitionAssignor. Streams partition assignor logic today sites in >>> the latter two classes. Hence the hierarchy today is: >>> >>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> >>> StickyTaskAssignor. >>> >>> We need to think about where the proposed implementation would take place >>> at, and personally I think it is not the best option to inject all of them >>> into the StreamsPartitionAssignor / StickyTaskAssignor since the logic of >>> "triggering another rebalance" etc would require some coordinator logic >>> which is hard to mimic at PartitionAssignor level. On the other hand, >>> since >>> we are embedding a KafkaConsumer client as a whole we cannot just replace >>> ConsumerCoordinator with a specialized StreamsCoordinator like Connect >>> does >>> in KIP-415. So I'd like to maybe split the current proposal in both >>> consumer layer and streams-assignor layer like we did in KIP-98/KIP-129. >>> And then the key thing to consider is how to cut off the boundary so that >>> the modifications we push to ConsumerCoordinator would be beneficial >>> universally for any consumers, while keep the Streams-specific logic at >>> the >>> assignor level. >>> Yes, that's also my ideal plan. The details for the implementation are >>> depicted >>> in this doc< >>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae>, >>> and I have explained the reasoning on why we want to push a >>> global change of replacing ConsumerCoordinator with StreamCoordinator. >>> The motivation >>> is that KIP space is usually used for public & algorithm level change, >>> not for internal >>> implementation details. >>> >>> 3. Depending on which design direction we choose, our migration plan would >>> also be quite different. For example, if we stay with ConsumerCoordinator >>> whose protocol type is "consumer" still, and we can manage to make all >>> changes agnostic to brokers as well as to old versioned consumers, then >>> our >>> migration plan could be much easier. >>> Yes, the upgrade plan was designed to take the new StreamCoordinator >>> approach >>> which means we shall define a new protocol type. For existing application >>> we could only >>> maintain the same `consumer` protocol type is because current broker only >>> allows >>> change of protocol type when the consumer group is empty. It is of course >>> user-unfriendly to force >>> a wipe-out for the entire application, and I don't think maintaining old >>> protocol type would greatly >>> impact ongoing services using new stream coordinator. WDYT? >>> >>> 4. I think one major issue related to this KIP is that today, in the >>> StickyPartitionAssignor, we always try to honor stickiness over workload >>> balance, and hence "learner task" is needed to break this priority, but >>> I'm >>> wondering if we can have a better solution within sticky task assignor >>> that >>> accommodate this? >>> Great question! That's what I explained in the proposal, which is that we >>> should breakdown our >>> delivery into different stages. At very beginning, our goal is to trigger >>> learner task assignment only on >>> `new` hosts, where we shall leverage leader's knowledge of previous round >>> of rebalance to figure out. After >>> stage one, our goal is to have a smooth scaling up experience, but the >>> task balance problem is kind of orthogonal. >>> The load balance problem is a much broader topic than auto scaling, which >>> I figure worth discussing within >>> this KIP's context since it's a naturally next-step, but wouldn't be the >>> main topic. >>> Learner task or auto scaling support should be treated as `a helpful >>> mechanism to reach load balance`, but not `an algorithm defining load >>> balance`. It would be great if you could share some insights of the stream >>> task balance, which eventually helps us to break out of the KIP-429's scope >>> and even define a separate KIP to focus on task weight & assignment logic >>> improvement. >>> >>> Also thank you for making improvement on the KIP context and organization! >>> >>> Best, >>> Boyang >>> ________________________________ >>> From: Guozhang Wang <wangg...@gmail.com> >>> Sent: Saturday, March 2, 2019 6:00 AM >>> To: dev >>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams >>> >>> Hello Boyang, >>> >>> I've just made a quick pass on the KIP and here are some thoughts. >>> >>> Meta: >>> >>> 1. I'm still not sure if it's worthwhile to add a new type of "learner >>> task" in addition to "standby task": if the only difference is that for >>> the >>> latter, we would consider workload balance while for the former we would >>> not, I think we can just adjust the logic of StickyTaskAssignor a bit to >>> break that difference. Adding a new type of task would be adding a lot of >>> code complexity, so if we can still piggy-back the logic on a standby-task >>> I would prefer to do so. >>> >>> 2. One thing that's still not clear from the KIP wiki itself is which >>> layer >>> would the logic be implemented at. Although for most KIPs we would not >>> require internal implementation details but only public facing API >>> updates, >>> for a KIP like this I think it still requires to flesh out details on the >>> implementation design. More specifically: today Streams embed a full >>> fledged Consumer client, which hard-code a ConsumerCoordinator inside, >>> Streams then injects a StreamsPartitionAssignor to its plugable >>> PartitionAssignor interface and inside the StreamsPartitionAssignor we >>> also >>> have a TaskAssignor interface whose default implementation is >>> StickyPartitionAssignor. Streams partition assignor logic today sites in >>> the latter two classes. Hence the hierarchy today is: >>> >>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor -> >>> StickyTaskAssignor. >>> >>> We need to think about where the proposed implementation would take place >>> at, and personally I think it is not the best option to inject all of them >>> into the StreamsPartitionAssignor / StickyTaskAssignor since the logic of >>> "triggering another rebalance" etc would require some coordinator logic >>> which is hard to mimic at PartitionAssignor level. On the other hand, >>> since >>> we are embedding a KafkaConsumer client as a whole we cannot just replace >>> ConsumerCoordinator with a specialized StreamsCoordinator like Connect >>> does >>> in KIP-415. So I'd like to maybe split the current proposal in both >>> consumer layer and streams-assignor layer like we did in KIP-98/KIP-129. >>> And then the key thing to consider is how to cut off the boundary so that >>> the modifications we push to ConsumerCoordinator would be beneficial >>> universally for any consumers, while keep the Streams-specific logic at >>> the >>> assignor level. >>> >>> 3. Depending on which design direction we choose, our migration plan would >>> also be quite different. For example, if we stay with ConsumerCoordinator >>> whose protocol type is "consumer" still, and we can manage to make all >>> changes agnostic to brokers as well as to old versioned consumers, then >>> our >>> migration plan could be much easier. >>> >>> 4. I think one major issue related to this KIP is that today, in the >>> StickyPartitionAssignor, we always try to honor stickiness over workload >>> balance, and hence "learner task" is needed to break this priority, but >>> I'm >>> wondering if we can have a better solution within sticky task assignor >>> that >>> accommodate this? >>> >>> Minor: >>> >>> 1. The idea of two rebalances have also been discussed in >>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should add the >>> reference on the wiki page as well. >>> 2. Could you also add a section describing how the subscription / >>> assignment metadata will be re-formatted? Without this information it is >>> hard to get to the bottom of your idea. For example in the "Leader >>> Transfer >>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is new >>> member" >>> and hence would blindly obey stickiness over workload balance requirement. >>> >>> Guozhang >>> >>> >>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <bche...@outlook.com> wrote: >>> >>>> Hey community friends, >>>> >>>> I'm gladly inviting you to have a look at the proposal to add >>> incremental >>>> rebalancing to Kafka Streams, A.K.A auto-scaling support. >>>> >>>> >>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams >>>> >>>> Special thanks to Guozhang for giving great guidances and important >>>> feedbacks while making this KIP! >>>> >>>> Best, >>>> Boyang >>>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >> -- >> -- Guozhang >> > >
signature.asc
Description: OpenPGP digital signature