Thank for the KIP, Boyang and Guozhang!

I made an initial pass and have some questions/comments. One high level
comment: it seems that the KIP "mixes" plain consumer and Kafka Streams
use case a little bit (at least in the presentation). It might be
helpful to separate both cases clearly, or maybe limit the scope to
plain consumer only.



10) For `PartitionAssignor.Assignment`: It seems we need a new method
`List<TopicPartitions> revokedPartitions()` ?



20) In Section "Consumer Coordinator Algorithm"

    Bullet point "1a)": If the subscription changes and a topic is
removed from the subscription, why do we not revoke the partitions?

    Bullet point "1a)": What happens is a topic is deleted (or a
partition is removed/deleted from a topic)? Should we call the new
`onPartitionsEmigrated()` callback for this case?

    Bullet point "2b)" Should we update the `PartitionAssignor`
interface to pass in the "old assignment" as third parameter into
`assign()`?



30) Rebalance delay (as used in KIP-415): Could a rebalance delay
subsume KIP-345? Configuring static members is rather complicated, and I
am wondering if a rebalance delay would be sufficient?



40) Quote: "otherwise the we would fall into the case 3.b) forever."

What is "case 3.b" ?



50) Section "Looking into the Future"

Nit: the new "ProtocolVersion" field is missing in the first line
describing "JoinGroupRequest"

> This can also help saving "version probing" cost on Streams as well.

How does this relate to Kafka Streams "version probing" implementation?
How can we exploit the new `ProtocolVersion` in Streams to improve
"version probing" ? I have a rough idea, but would like to hear more
details.



60) Section "Recommended Upgrade Procedure"

> Set the `stream.rebalancing.mode` to `upgrading`, which will force the stream 
> application to stay with protocol type "consumer".

This config is not discussed in the KIP and appears in this section
without context. Can you elaborate about it?



-Matthias




On 3/29/19 6:20 PM, Guozhang Wang wrote:
> Bump up on this discussion thread. I've added a few new drawings for better
> illustration, would really appreciate your feedbacks.
> 
> 
> Guozhang
> 
> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <wangg...@gmail.com> wrote:
> 
>> Hello Boyang,
>>
>> I've made another thorough pass over this KIP and I'd like to spilt it
>> into two parts: the first part, covered in KIP-429 would be touching on
>> Consumer Coordinator only to have incremental rebalance protocol in place.
>> The second part (for now I've reserved KIP number 444 for it) would contain
>> all the changes on StreamsPartitionAssginor to allow warming up new
>> members.
>>
>> I think the first part, a.k.a. the current updated KIP-429 is ready for
>> review and discussions again. Would love to hear people's feedbacks and
>> ideas.
>>
>> Guozhang
>>
>>
>>
>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <bche...@outlook.com> wrote:
>>
>>> Thanks Guozhang for the great questions. Answers are inlined:
>>>
>>> 1. I'm still not sure if it's worthwhile to add a new type of "learner
>>> task" in addition to "standby task": if the only difference is that for
>>> the
>>> latter, we would consider workload balance while for the former we would
>>> not, I think we can just adjust the logic of StickyTaskAssignor a bit to
>>> break that difference. Adding a new type of task would be adding a lot of
>>> code complexity, so if we can still piggy-back the logic on a standby-task
>>> I would prefer to do so.
>>> In the proposal we stated that we are not adding a new type of task
>>> implementation. The
>>> learner task shall share the same implementation with normal standby
>>> task, only that we
>>> shall tag the standby task with learner and prioritize the learner tasks
>>> replay effort.
>>> 2. One thing that's still not clear from the KIP wiki itself is which
>>> layer
>>> would the logic be implemented at. Although for most KIPs we would not
>>> require internal implementation details but only public facing API
>>> updates,
>>> for a KIP like this I think it still requires to flesh out details on the
>>> implementation design. More specifically: today Streams embed a full
>>> fledged Consumer client, which hard-code a ConsumerCoordinator inside,
>>> Streams then injects a StreamsPartitionAssignor to its pluggable
>>> PartitionAssignor interface and inside the StreamsPartitionAssignor we
>>> also
>>> have a TaskAssignor interface whose default implementation is
>>> StickyPartitionAssignor. Streams partition assignor logic today sites in
>>> the latter two classes. Hence the hierarchy today is:
>>>
>>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor ->
>>> StickyTaskAssignor.
>>>
>>> We need to think about where the proposed implementation would take place
>>> at, and personally I think it is not the best option to inject all of them
>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the logic of
>>> "triggering another rebalance" etc would require some coordinator logic
>>> which is hard to mimic at PartitionAssignor level. On the other hand,
>>> since
>>> we are embedding a KafkaConsumer client as a whole we cannot just replace
>>> ConsumerCoordinator with a specialized StreamsCoordinator like Connect
>>> does
>>> in KIP-415. So I'd like to maybe split the current proposal in both
>>> consumer layer and streams-assignor layer like we did in KIP-98/KIP-129.
>>> And then the key thing to consider is how to cut off the boundary so that
>>> the modifications we push to ConsumerCoordinator would be beneficial
>>> universally for any consumers, while keep the Streams-specific logic at
>>> the
>>> assignor level.
>>> Yes, that's also my ideal plan. The details for the implementation are
>>> depicted
>>> in this doc<
>>> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae>,
>>> and I have explained the reasoning on why we want to push a
>>> global change of replacing ConsumerCoordinator with StreamCoordinator.
>>> The motivation
>>> is that KIP space is usually used for public & algorithm level change,
>>> not for internal
>>> implementation details.
>>>
>>> 3. Depending on which design direction we choose, our migration plan would
>>> also be quite different. For example, if we stay with ConsumerCoordinator
>>> whose protocol type is "consumer" still, and we can manage to make all
>>> changes agnostic to brokers as well as to old versioned consumers, then
>>> our
>>> migration plan could be much easier.
>>> Yes, the upgrade plan was designed to take the new StreamCoordinator
>>> approach
>>> which means we shall define a new protocol type. For existing application
>>> we could only
>>> maintain the same `consumer` protocol type is because current broker only
>>> allows
>>> change of protocol type when the consumer group is empty. It is of course
>>> user-unfriendly to force
>>> a wipe-out for the entire application, and I don't think maintaining old
>>> protocol type would greatly
>>> impact ongoing services using new stream coordinator. WDYT?
>>>
>>> 4. I think one major issue related to this KIP is that today, in the
>>> StickyPartitionAssignor, we always try to honor stickiness over workload
>>> balance, and hence "learner task" is needed to break this priority, but
>>> I'm
>>> wondering if we can have a better solution within sticky task assignor
>>> that
>>> accommodate this?
>>> Great question! That's what I explained in the proposal, which is that we
>>> should breakdown our
>>> delivery into different stages. At very beginning, our goal is to trigger
>>> learner task assignment only on
>>> `new` hosts, where we shall leverage leader's knowledge of previous round
>>> of rebalance to figure out. After
>>> stage one, our goal is to have a smooth scaling up experience, but the
>>> task balance problem is kind of orthogonal.
>>> The load balance problem is a much broader topic than auto scaling, which
>>> I figure worth discussing within
>>> this KIP's context since it's a naturally next-step, but wouldn't be the
>>> main topic.
>>> Learner task or auto scaling support should be treated as `a helpful
>>> mechanism to reach load balance`, but not `an algorithm defining load
>>> balance`. It would be great if you could share some insights of the stream
>>> task balance, which eventually helps us to break out of the KIP-429's scope
>>> and even define a separate KIP to focus on task weight & assignment logic
>>> improvement.
>>>
>>> Also thank you for making improvement on the KIP context and organization!
>>>
>>> Best,
>>> Boyang
>>> ________________________________
>>> From: Guozhang Wang <wangg...@gmail.com>
>>> Sent: Saturday, March 2, 2019 6:00 AM
>>> To: dev
>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka Streams
>>>
>>> Hello Boyang,
>>>
>>> I've just made a quick pass on the KIP and here are some thoughts.
>>>
>>> Meta:
>>>
>>> 1. I'm still not sure if it's worthwhile to add a new type of "learner
>>> task" in addition to "standby task": if the only difference is that for
>>> the
>>> latter, we would consider workload balance while for the former we would
>>> not, I think we can just adjust the logic of StickyTaskAssignor a bit to
>>> break that difference. Adding a new type of task would be adding a lot of
>>> code complexity, so if we can still piggy-back the logic on a standby-task
>>> I would prefer to do so.
>>>
>>> 2. One thing that's still not clear from the KIP wiki itself is which
>>> layer
>>> would the logic be implemented at. Although for most KIPs we would not
>>> require internal implementation details but only public facing API
>>> updates,
>>> for a KIP like this I think it still requires to flesh out details on the
>>> implementation design. More specifically: today Streams embed a full
>>> fledged Consumer client, which hard-code a ConsumerCoordinator inside,
>>> Streams then injects a StreamsPartitionAssignor to its plugable
>>> PartitionAssignor interface and inside the StreamsPartitionAssignor we
>>> also
>>> have a TaskAssignor interface whose default implementation is
>>> StickyPartitionAssignor. Streams partition assignor logic today sites in
>>> the latter two classes. Hence the hierarchy today is:
>>>
>>> KafkaConsumer -> ConsumerCoordinator -> StreamsPartitionAssignor ->
>>> StickyTaskAssignor.
>>>
>>> We need to think about where the proposed implementation would take place
>>> at, and personally I think it is not the best option to inject all of them
>>> into the StreamsPartitionAssignor / StickyTaskAssignor since the logic of
>>> "triggering another rebalance" etc would require some coordinator logic
>>> which is hard to mimic at PartitionAssignor level. On the other hand,
>>> since
>>> we are embedding a KafkaConsumer client as a whole we cannot just replace
>>> ConsumerCoordinator with a specialized StreamsCoordinator like Connect
>>> does
>>> in KIP-415. So I'd like to maybe split the current proposal in both
>>> consumer layer and streams-assignor layer like we did in KIP-98/KIP-129.
>>> And then the key thing to consider is how to cut off the boundary so that
>>> the modifications we push to ConsumerCoordinator would be beneficial
>>> universally for any consumers, while keep the Streams-specific logic at
>>> the
>>> assignor level.
>>>
>>> 3. Depending on which design direction we choose, our migration plan would
>>> also be quite different. For example, if we stay with ConsumerCoordinator
>>> whose protocol type is "consumer" still, and we can manage to make all
>>> changes agnostic to brokers as well as to old versioned consumers, then
>>> our
>>> migration plan could be much easier.
>>>
>>> 4. I think one major issue related to this KIP is that today, in the
>>> StickyPartitionAssignor, we always try to honor stickiness over workload
>>> balance, and hence "learner task" is needed to break this priority, but
>>> I'm
>>> wondering if we can have a better solution within sticky task assignor
>>> that
>>> accommodate this?
>>>
>>> Minor:
>>>
>>> 1. The idea of two rebalances have also been discussed in
>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we should add the
>>> reference on the wiki page as well.
>>> 2. Could you also add a section describing how the subscription /
>>> assignment metadata will be re-formatted? Without this information it is
>>> hard to get to the bottom of your idea. For example in the "Leader
>>> Transfer
>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4 is new
>>> member"
>>> and hence would blindly obey stickiness over workload balance requirement.
>>>
>>> Guozhang
>>>
>>>
>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <bche...@outlook.com> wrote:
>>>
>>>> Hey community friends,
>>>>
>>>> I'm gladly inviting you to have a look at the proposal to add
>>> incremental
>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
>>>>
>>>>
>>>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
>>>>
>>>> Special thanks to Guozhang for giving great guidances and important
>>>> feedbacks while making this KIP!
>>>>
>>>> Best,
>>>> Boyang
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to