Cool, will update the page.

On Wed, May 22, 2019 at 9:38 PM Matthias J. Sax <matth...@confluent.io>
wrote:

> SGTM.
>
> I think we should still document the behavior on the KIP and explain why
> it's implemented that way.
>
>
> -Matthias
>
> On 5/22/19 7:04 PM, Guozhang Wang wrote:
> > Hello Matthias,
> >
> > I've thought about that before, and the reason I did not include this as
> > part of the KIP-429 scope is that fetcher / coordinator may get quite
> > complicated to return non-empty data if
> "updateAssignmentMetadataIfNeeded"
> > returns false in KafkaConsumer. In addition, when there's a rebalance in
> > progress, letting consumers to process data which potentially may take
> > longer time (in Streams for example, it is related to `max.poll.interval`
> > config) could lead to higher chance of "partitions lost" and wasted
> > processing work.
> >
> > So I've decided to still keep it as simple as is today, and admittedly
> from
> > a user perspective, they may see consecutive "poll" call returning no
> data.
> > I will create a JIRA ticket capturing this idea for future discussion
> > whether we should consider this as a general optimization in consumer.
> Does
> > that sound good to you?
> >
> >
> > Guozhang
> >
> >
> > On Wed, May 22, 2019 at 4:31 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Thanks to the KIP and starting the VOTE Guozhang. I am overall +1.
> >>
> >> One follow up thought: the KIP does not discuss in details, how `poll()`
> >> will behave after the change. It might actually be important to ensure
> >> that `poll()` behavior changes to be non-blocking to allow an
> >> application to process data from non-revoked partitions while a
> >> rebalance is happening in the background.
> >>
> >> Thoughts?
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 5/10/19 1:10 AM, Guozhang Wang wrote:
> >>> Hello Matthias,
> >>>
> >>> I'm proposing to change this behavior holistically inside
> >>> ConsumerCoordinator actually. In other words I'm trying to piggy-back
> >> this
> >>> behavioral fix of KAFKA-4600 along with this KIP, and the motivation
> for
> >> me
> >>> to do this piggy-backing is that, with incremental rebalancing, there
> >> would
> >>> be partial affected partitions as we are not revoking every body any
> >> more.
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Thu, May 9, 2019 at 6:21 AM Matthias J. Sax <matth...@confluent.io>
> >>> wrote:
> >>>
> >>>> Thanks Guozhang!
> >>>>
> >>>> The simplified upgrade path is great!
> >>>>
> >>>>
> >>>> Just a clarification question about the "Rebalance Callback Error
> >>>> Handling" -- does this change affect the `ConsumerCoordinator` only if
> >>>> incremental rebalancing is use? Or does the behavior also change for
> the
> >>>> eager rebalancing case?
> >>>>
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 5/9/19 3:37 AM, Guozhang Wang wrote:
> >>>>> Hello all,
> >>>>>
> >>>>> Thanks for everyone who've shared their feedbacks for this KIP! If
> >>>> there's
> >>>>> no further comments I'll start the voting thread by end of tomorrow.
> >>>>>
> >>>>>
> >>>>> Guozhang.
> >>>>>
> >>>>> On Wed, May 8, 2019 at 6:36 PM Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>>>>
> >>>>>> Hello Boyang,
> >>>>>>
> >>>>>> On Wed, May 1, 2019 at 4:51 PM Boyang Chen <bche...@outlook.com>
> >> wrote:
> >>>>>>
> >>>>>>> Hey Guozhang,
> >>>>>>>
> >>>>>>> thank you for the great write up. Overall the motivation and
> changes
> >>>>>>> LGTM, just some minor comments:
> >>>>>>>
> >>>>>>>
> >>>>>>>   1.  In "Consumer Coordinator Algorithm", we could reorder
> alphabet
> >>>>>>> points for 3d~3f from ["ready-to-migrate-partitions",
> >>>>>>> "unknown-but-owned-partitions",  "maybe-revoking-partitions"] to
> >>>>>>> ["maybe-revoking-partitions", "ready-to-migrate-partitions",
> >>>>>>> "unknown-but-owned-partitions"] in order to be consistent with
> 3c1~3.
> >>>>>>>
> >>>>>>
> >>>>>> Ack. Updated.
> >>>>>>
> >>>>>>
> >>>>>>>   2.  In "Consumer Coordinator Algorithm", 1c suggests to revoke
> all
> >>>>>>> partition upon heartbeat/commit fail. What's the gain here? Do we
> >> want
> >>>> to
> >>>>>>> keep all partitions running at this moment, to be optimistic for
> the
> >>>> case
> >>>>>>> when no partitions get reassigned?
> >>>>>>>
> >>>>>>
> >>>>>> That's a good catch. When REBALANCE_IN_PROGRESS is received, we can
> >> just
> >>>>>> re-join the group with all the currently owned partitions encoded.
> >>>> Updated.
> >>>>>>
> >>>>>>
> >>>>>>>   3.  In "Recommended Upgrade Procedure", remove extra 'those': "
> The
> >>>>>>> 'sticky' assignor works even those there are "
> >>>>>>>
> >>>>>>
> >>>>>> Ack, should be `even when`.
> >>>>>>
> >>>>>>
> >>>>>>>   4.  Put two "looking into the future" into a separate category
> from
> >>>>>>> migration session. It seems inconsistent for readers to see this
> >>>> before we
> >>>>>>> finished discussion for everything.
> >>>>>>>
> >>>>>>
> >>>>>> Ack.
> >>>>>>
> >>>>>>
> >>>>>>>   5.  Have we discussed the concern on the serialization? Could the
> >> new
> >>>>>>> metadata we are adding grow larger than the message size cap?
> >>>>>>>
> >>>>>>
> >>>>>> We're completing https://issues.apache.org/jira/browse/KAFKA-7149
> >> which
> >>>>>> should largely reduce the message size (will update the wiki
> >>>> accordingly as
> >>>>>> well).
> >>>>>>
> >>>>>>
> >>>>>>>
> >>>>>>> Boyang
> >>>>>>>
> >>>>>>> ________________________________
> >>>>>>> From: Guozhang Wang <wangg...@gmail.com>
> >>>>>>> Sent: Monday, April 15, 2019 9:20 AM
> >>>>>>> To: dev
> >>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> >> Streams
> >>>>>>>
> >>>>>>> Hello Jason,
> >>>>>>>
> >>>>>>> I agree with you that for range / round-robin it makes less sense
> to
> >> be
> >>>>>>> compatible with cooperative rebalance protocol.
> >>>>>>>
> >>>>>>> As for StickyAssignor, however, I think it would still be possible
> to
> >>>> make
> >>>>>>> the current implementation to be compatible with cooperative
> >>>> rebalance. So
> >>>>>>> after pondering on different options at hand I'm now proposing this
> >>>>>>> approach as listed in the upgrade section:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol#KIP-429:KafkaConsumerIncrementalRebalanceProtocol-CompatibilityandUpgradePath
> >>>>>>>
> >>>>>>> The idea is to let assignors specify which protocols it would work
> >>>> with,
> >>>>>>> associating with a different name; then the upgrade path would
> >> involve
> >>>> a
> >>>>>>> "compatible" protocol which actually still use eager behavior while
> >>>>>>> encoding two assignors if possible. In "Rejected Section" (just to
> >>>> clarify
> >>>>>>> I'm not finalizing it as rejected, just putting it there for now,
> and
> >>>> if
> >>>>>>> we
> >>>>>>> like this one instead we can always switch them) I listed the other
> >>>>>>> approach we once discussed about, and arguing its cons of
> duplicated
> >>>> class
> >>>>>>> seems overwhelm the pros of saving the  "rebalance.protocol"
> config.
> >>>>>>>
> >>>>>>> Let me know WDYT.
> >>>>>>>
> >>>>>>> Guozhang
> >>>>>>>
> >>>>>>> On Fri, Apr 12, 2019 at 6:08 PM Jason Gustafson <
> ja...@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Guozhang,
> >>>>>>>>
> >>>>>>>> Responses below:
> >>>>>>>>
> >>>>>>>> 2. The interface's default implementation will just be
> >>>>>>>>> `onPartitionRevoked`, so for user's instantiation if they do not
> >> make
> >>>>>>> any
> >>>>>>>>> code changes they should be able to recompile the code and
> >> continue.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Ack, makes sense.
> >>>>>>>>
> >>>>>>>> 4. Hmm.. not sure if it will work. The main issue is that the
> >>>>>>>>> consumer-coordinator behavior (whether to revoke all or none at
> >>>>>>>>> onRebalancePrepare) is independent of the selected protocol's
> >>>> assignor
> >>>>>>>>> (eager or cooperative), so even if the assignor is selected to be
> >> the
> >>>>>>>>> old-versioned one, we will still not revoke at the
> >>>>>>> consumer-coordinator
> >>>>>>>>> layer and hence has the same risk of migrating still-owned
> >>>> partitions,
> >>>>>>>>> right?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Yeah, basically we would have to push the eager/cooperative logic
> >> into
> >>>>>>> the
> >>>>>>>> PartitionAssignor itself and make the consumer aware of the
> >> rebalance
> >>>>>>>> protocol it is compatible with. As long as an eager protocol
> _could_
> >>>> be
> >>>>>>>> selected, the consumer would have to be pessimistic and do eager
> >>>>>>>> revocation. But if all the assignors configured in the consumer
> >>>> support
> >>>>>>>> cooperative reassignment, then either 1) a cooperative protocol
> will
> >>>> be
> >>>>>>>> selected and cooperative revocation can be safely used, or 2) if
> the
> >>>>>>> rest
> >>>>>>>> of the group does not support it, then the consumer will simply
> >> fail.
> >>>>>>>>
> >>>>>>>> Another point which you raised offline and I will repeat here is
> >> that
> >>>>>>> this
> >>>>>>>> proposal's benefit is mostly limited to sticky assignment logic.
> >>>>>>> Arguably
> >>>>>>>> the range assignor may have some incidental stickiness,
> particularly
> >>>> if
> >>>>>>> the
> >>>>>>>> group is rebalancing for a newly created or deleted topic. For
> other
> >>>>>>> cases,
> >>>>>>>> the proposal is mostly additional overhead since it takes an
> >>>> additional
> >>>>>>>> rebalance and many of the partitions will move. Perhaps it doesn't
> >>>> make
> >>>>>>> as
> >>>>>>>> much sense to use the cooperative protocol for strategies like
> range
> >>>> and
> >>>>>>>> round-robin. That kind of argues in favor of pushing some of the
> >>>> control
> >>>>>>>> into the assignor itself. Maybe we would not bother creating
> >>>>>>>> CooperativeRange as I suggested above, but it would make sense to
> >>>>>>> create a
> >>>>>>>> cooperative version of the sticky assignment strategy. I thought
> we
> >>>>>>> might
> >>>>>>>> have to create a new sticky assignor anyway because I can't see
> how
> >> we
> >>>>>>>> would get compatible behavior mixing with the old version anyway.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>> Jason
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Apr 11, 2019 at 5:53 PM Guozhang Wang <wangg...@gmail.com
> >
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hello Matthias:
> >>>>>>>>>
> >>>>>>>>> Thanks for your review.
> >>>>>>>>>
> >>>>>>>>> The background section uses streams assignor as well as the
> >>>> consumer's
> >>>>>>>> own
> >>>>>>>>> stick assignor as examples illustrating the situation, but this
> KIP
> >>>> is
> >>>>>>>> for
> >>>>>>>>> consumer coordinator itself, and the rest of the paragraph did
> not
> >>>>>>> talk
> >>>>>>>>> about Streams any more. If you feel it's a bit distracted I can
> >>>> remove
> >>>>>>>>> those examples.
> >>>>>>>>>
> >>>>>>>>> 10). While working on the PR I realized that the revoked
> partitions
> >>>> on
> >>>>>>>>> assignment is not needed (this is being discussed on the PR
> itself:
> >>>>>>>>> https://github.com/apache/kafka/pull/6528#issuecomment-480009890
> >>>>>>>>>
> >>>>>>>>> 20). 1.a. Good question, I've updated the wiki to let the
> >> consumer's
> >>>>>>>>> cleanup assignment and re-join, and not letting assignor making
> any
> >>>>>>>>> proactive changes. The idea is to keep logic simpler and not
> doing
> >>>> any
> >>>>>>>>> "split brain" stuff.
> >>>>>>>>>
> >>>>>>>>> 20). 2.b. No we do not need, since the owned-partitions will be
> >> part
> >>>>>>> of
> >>>>>>>> the
> >>>>>>>>> Subscription passed in to assign() already.
> >>>>>>>>>
> >>>>>>>>> 30). As Boyang mentioned, there are some drawbacks that can not
> be
> >>>>>>>>> addressed by rebalance delay still, hence still voted KIP-345
> (some
> >>>>>>> more
> >>>>>>>>> details can be found on the discussion thread of KIP-345 itself).
> >> One
> >>>>>>>>> example is that as the instance resumes, its member id will be
> >> empty
> >>>>>>> so
> >>>>>>>> we
> >>>>>>>>> are still relying on assignor to give it the assignment from the
> >> old
> >>>>>>>>> member-id while keeping all other member's assignment unchanged.
> >>>>>>>>>
> >>>>>>>>> 40). Incomplete sentence, I've updated it.
> >>>>>>>>>
> >>>>>>>>> 50). Here's my idea: suppose we augment the join group schema
> with
> >>>>>>>>> `protocol version` in 2.3, and then with both brokers and clients
> >>>>>>> being
> >>>>>>>> in
> >>>>>>>>> version 2.3+, on the first rolling bounce where subscription and
> >>>>>>>> assignment
> >>>>>>>>> schema and / or user metadata has changed, this protocol version
> >> will
> >>>>>>> be
> >>>>>>>>> bumped. On the broker side, when receiving all member's
> join-group
> >>>>>>>> request,
> >>>>>>>>> it will choose the one that has the highest protocol version
> (also
> >> it
> >>>>>>>>> assumes higher versioned protocol is always backward compatible,
> >> i.e.
> >>>>>>> the
> >>>>>>>>> coordinator can recognize lower versioned protocol as well) and
> >>>>>>> select it
> >>>>>>>>> as the leader. Then the leader can decide, based on its received
> >> and
> >>>>>>>>> deserialized subscription information, how to assign partitions
> and
> >>>>>>> how
> >>>>>>>> to
> >>>>>>>>> encode the assignment accordingly so that everyone can understand
> >> it.
> >>>>>>>> With
> >>>>>>>>> this, in Streams for example, no version probing would be needed
> >>>>>>> since we
> >>>>>>>>> are guaranteed the leader knows everyone's version -- again it is
> >>>>>>>> assuming
> >>>>>>>>> that higher versioned protocol is always backward compatible --
> and
> >>>>>>> hence
> >>>>>>>>> can successfully do the assignment at that round.
> >>>>>>>>>
> >>>>>>>>> 60). My bad, this section was not updated while the design was
> >>>>>>> evolved,
> >>>>>>>>> I've updated it.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Apr 9, 2019 at 7:22 PM Boyang Chen <bche...@outlook.com>
> >>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the review Matthias! My 2-cent on the rebalance delay
> >> is
> >>>>>>>> that
> >>>>>>>>>> it is a rather fixed trade-off between
> >>>>>>>>>>
> >>>>>>>>>> task availability and resource shuffling. If we eventually
> trigger
> >>>>>>>>>> rebalance after rolling bounce, certain consumer
> >>>>>>>>>>
> >>>>>>>>>> setup is still faced with global shuffles, for example
> member.id
> >>>>>>>> ranking
> >>>>>>>>>> based round robin strategy, as rejoining dynamic
> >>>>>>>>>>
> >>>>>>>>>> members will be assigned with new member.id which reorders the
> >>>>>>>>>> assignment. So I think the primary goal of incremental
> >>>>>>>>>>
> >>>>>>>>>> rebalancing is still improving the cluster availability during
> >>>>>>>> rebalance,
> >>>>>>>>>> because it didn't revoke any partition during this
> >>>>>>>>>>
> >>>>>>>>>> process. Also, the perk is minimum configuration requirement :)
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>>
> >>>>>>>>>> Boyang
> >>>>>>>>>>
> >>>>>>>>>> ________________________________
> >>>>>>>>>> From: Matthias J. Sax <matth...@confluent.io>
> >>>>>>>>>> Sent: Tuesday, April 9, 2019 7:47 AM
> >>>>>>>>>> To: dev
> >>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for Kafka
> >>>>>>> Streams
> >>>>>>>>>>
> >>>>>>>>>> Thank for the KIP, Boyang and Guozhang!
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> I made an initial pass and have some questions/comments. One
> high
> >>>>>>> level
> >>>>>>>>>> comment: it seems that the KIP "mixes" plain consumer and Kafka
> >>>>>>> Streams
> >>>>>>>>>> use case a little bit (at least in the presentation). It might
> be
> >>>>>>>>>> helpful to separate both cases clearly, or maybe limit the scope
> >> to
> >>>>>>>>>> plain consumer only.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 10) For `PartitionAssignor.Assignment`: It seems we need a new
> >>>>>>> method
> >>>>>>>>>> `List<TopicPartitions> revokedPartitions()` ?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 20) In Section "Consumer Coordinator Algorithm"
> >>>>>>>>>>
> >>>>>>>>>>     Bullet point "1a)": If the subscription changes and a topic
> is
> >>>>>>>>>> removed from the subscription, why do we not revoke the
> >> partitions?
> >>>>>>>>>>
> >>>>>>>>>>     Bullet point "1a)": What happens is a topic is deleted (or a
> >>>>>>>>>> partition is removed/deleted from a topic)? Should we call the
> new
> >>>>>>>>>> `onPartitionsEmigrated()` callback for this case?
> >>>>>>>>>>
> >>>>>>>>>>     Bullet point "2b)" Should we update the `PartitionAssignor`
> >>>>>>>>>> interface to pass in the "old assignment" as third parameter
> into
> >>>>>>>>>> `assign()`?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 30) Rebalance delay (as used in KIP-415): Could a rebalance
> delay
> >>>>>>>>>> subsume KIP-345? Configuring static members is rather
> complicated,
> >>>>>>> and
> >>>>>>>> I
> >>>>>>>>>> am wondering if a rebalance delay would be sufficient?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 40) Quote: "otherwise the we would fall into the case 3.b)
> >> forever."
> >>>>>>>>>>
> >>>>>>>>>> What is "case 3.b" ?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 50) Section "Looking into the Future"
> >>>>>>>>>>
> >>>>>>>>>> Nit: the new "ProtocolVersion" field is missing in the first
> line
> >>>>>>>>>> describing "JoinGroupRequest"
> >>>>>>>>>>
> >>>>>>>>>>> This can also help saving "version probing" cost on Streams as
> >>>>>>> well.
> >>>>>>>>>>
> >>>>>>>>>> How does this relate to Kafka Streams "version probing"
> >>>>>>> implementation?
> >>>>>>>>>> How can we exploit the new `ProtocolVersion` in Streams to
> improve
> >>>>>>>>>> "version probing" ? I have a rough idea, but would like to hear
> >> more
> >>>>>>>>>> details.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 60) Section "Recommended Upgrade Procedure"
> >>>>>>>>>>
> >>>>>>>>>>> Set the `stream.rebalancing.mode` to `upgrading`, which will
> >> force
> >>>>>>>> the
> >>>>>>>>>> stream application to stay with protocol type "consumer".
> >>>>>>>>>>
> >>>>>>>>>> This config is not discussed in the KIP and appears in this
> >> section
> >>>>>>>>>> without context. Can you elaborate about it?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 3/29/19 6:20 PM, Guozhang Wang wrote:
> >>>>>>>>>>> Bump up on this discussion thread. I've added a few new
> drawings
> >>>>>>> for
> >>>>>>>>>> better
> >>>>>>>>>>> illustration, would really appreciate your feedbacks.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> On Wed, Mar 20, 2019 at 6:17 PM Guozhang Wang <
> >> wangg...@gmail.com
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello Boyang,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I've made another thorough pass over this KIP and I'd like to
> >>>>>>> spilt
> >>>>>>>> it
> >>>>>>>>>>>> into two parts: the first part, covered in KIP-429 would be
> >>>>>>> touching
> >>>>>>>>> on
> >>>>>>>>>>>> Consumer Coordinator only to have incremental rebalance
> protocol
> >>>>>>> in
> >>>>>>>>>> place.
> >>>>>>>>>>>> The second part (for now I've reserved KIP number 444 for it)
> >>>>>>> would
> >>>>>>>>>> contain
> >>>>>>>>>>>> all the changes on StreamsPartitionAssginor to allow warming
> up
> >>>>>>> new
> >>>>>>>>>>>> members.
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think the first part, a.k.a. the current updated KIP-429 is
> >>>>>>> ready
> >>>>>>>>> for
> >>>>>>>>>>>> review and discussions again. Would love to hear people's
> >>>>>>> feedbacks
> >>>>>>>>> and
> >>>>>>>>>>>> ideas.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Mon, Mar 4, 2019 at 10:09 AM Boyang Chen <
> >> bche...@outlook.com
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks Guozhang for the great questions. Answers are inlined:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
> >>>>>>>>> "learner
> >>>>>>>>>>>>> task" in addition to "standby task": if the only difference
> is
> >>>>>>> that
> >>>>>>>>> for
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> latter, we would consider workload balance while for the
> former
> >>>>>>> we
> >>>>>>>>>> would
> >>>>>>>>>>>>> not, I think we can just adjust the logic of
> StickyTaskAssignor
> >>>>>>> a
> >>>>>>>> bit
> >>>>>>>>>> to
> >>>>>>>>>>>>> break that difference. Adding a new type of task would be
> >>>>>>> adding a
> >>>>>>>>> lot
> >>>>>>>>>> of
> >>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
> >>>>>>>>>> standby-task
> >>>>>>>>>>>>> I would prefer to do so.
> >>>>>>>>>>>>> In the proposal we stated that we are not adding a new type
> of
> >>>>>>> task
> >>>>>>>>>>>>> implementation. The
> >>>>>>>>>>>>> learner task shall share the same implementation with normal
> >>>>>>>> standby
> >>>>>>>>>>>>> task, only that we
> >>>>>>>>>>>>> shall tag the standby task with learner and prioritize the
> >>>>>>> learner
> >>>>>>>>>> tasks
> >>>>>>>>>>>>> replay effort.
> >>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself
> is
> >>>>>>>> which
> >>>>>>>>>>>>> layer
> >>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
> >>>>>>> would
> >>>>>>>>> not
> >>>>>>>>>>>>> require internal implementation details but only public
> facing
> >>>>>>> API
> >>>>>>>>>>>>> updates,
> >>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out
> >>>>>>> details
> >>>>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>> implementation design. More specifically: today Streams
> embed a
> >>>>>>>> full
> >>>>>>>>>>>>> fledged Consumer client, which hard-code a
> ConsumerCoordinator
> >>>>>>>>> inside,
> >>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its
> >> pluggable
> >>>>>>>>>>>>> PartitionAssignor interface and inside the
> >>>>>>> StreamsPartitionAssignor
> >>>>>>>>> we
> >>>>>>>>>>>>> also
> >>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is
> >>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic
> today
> >>>>>>>> sites
> >>>>>>>>>> in
> >>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
> >>>>>>> StreamsPartitionAssignor ->
> >>>>>>>>>>>>> StickyTaskAssignor.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We need to think about where the proposed implementation
> would
> >>>>>>> take
> >>>>>>>>>> place
> >>>>>>>>>>>>> at, and personally I think it is not the best option to
> inject
> >>>>>>> all
> >>>>>>>> of
> >>>>>>>>>> them
> >>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since
> >> the
> >>>>>>>>> logic
> >>>>>>>>>> of
> >>>>>>>>>>>>> "triggering another rebalance" etc would require some
> >>>>>>> coordinator
> >>>>>>>>> logic
> >>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the
> other
> >>>>>>>> hand,
> >>>>>>>>>>>>> since
> >>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
> >>>>>>> just
> >>>>>>>>>> replace
> >>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator
> like
> >>>>>>>>> Connect
> >>>>>>>>>>>>> does
> >>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal
> in
> >>>>>>> both
> >>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in
> >>>>>>>>>> KIP-98/KIP-129.
> >>>>>>>>>>>>> And then the key thing to consider is how to cut off the
> >>>>>>> boundary
> >>>>>>>> so
> >>>>>>>>>> that
> >>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
> >>>>>>>> beneficial
> >>>>>>>>>>>>> universally for any consumers, while keep the
> Streams-specific
> >>>>>>>> logic
> >>>>>>>>> at
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> assignor level.
> >>>>>>>>>>>>> Yes, that's also my ideal plan. The details for the
> >>>>>>> implementation
> >>>>>>>>> are
> >>>>>>>>>>>>> depicted
> >>>>>>>>>>>>> in this doc<
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>>>>> ,
> >>>>>>>>>> [
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://lh5.googleusercontent.com/DXWMyKNE9rFFIv7TNX56Q41QwqYp8ynivwWSJHHORqSRkoQxtraW2bqiB-NRUGAMYKkt8A=w1200-h630-p
> >>>>>>>>>> ]<
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> [External] KStream Smooth Auto-scaling Implementation Plan<
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1me2a5wvxAZT1QE6HkwyDl7C2TiBQlKN3Dpw_I1ro91U/edit#heading=h.qix74qdmekae
> >>>>>>>>>>>
> >>>>>>>>>> docs.google.com
> >>>>>>>>>> KStream Incremental Rebalancing Implementation Plan Authors:
> >> Boyang
> >>>>>>>> Chen,
> >>>>>>>>>> Guozhang Wang KIP link Stage: [Draft | Review | Approved]
> >>>>>>> Background We
> >>>>>>>>>> initiated KIP-429 for the promotion of incremental rebalancing
> >> work
> >>>>>>> for
> >>>>>>>>>> KStream. Behind the scene, there is non-trivial amount of effort
> >>>>>>> that
> >>>>>>>>> needs
> >>>>>>>>>> to...
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>>>> and I have explained the reasoning on why we want to push a
> >>>>>>>>>>>>> global change of replacing ConsumerCoordinator with
> >>>>>>>>> StreamCoordinator.
> >>>>>>>>>>>>> The motivation
> >>>>>>>>>>>>> is that KIP space is usually used for public & algorithm
> level
> >>>>>>>>> change,
> >>>>>>>>>>>>> not for internal
> >>>>>>>>>>>>> implementation details.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. Depending on which design direction we choose, our
> migration
> >>>>>>>> plan
> >>>>>>>>>> would
> >>>>>>>>>>>>> also be quite different. For example, if we stay with
> >>>>>>>>>> ConsumerCoordinator
> >>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
> >>>>>>> make
> >>>>>>>>> all
> >>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned
> >>>>>>> consumers,
> >>>>>>>>> then
> >>>>>>>>>>>>> our
> >>>>>>>>>>>>> migration plan could be much easier.
> >>>>>>>>>>>>> Yes, the upgrade plan was designed to take the new
> >>>>>>>> StreamCoordinator
> >>>>>>>>>>>>> approach
> >>>>>>>>>>>>> which means we shall define a new protocol type. For existing
> >>>>>>>>>> application
> >>>>>>>>>>>>> we could only
> >>>>>>>>>>>>> maintain the same `consumer` protocol type is because current
> >>>>>>>> broker
> >>>>>>>>>> only
> >>>>>>>>>>>>> allows
> >>>>>>>>>>>>> change of protocol type when the consumer group is empty. It
> is
> >>>>>>> of
> >>>>>>>>>> course
> >>>>>>>>>>>>> user-unfriendly to force
> >>>>>>>>>>>>> a wipe-out for the entire application, and I don't think
> >>>>>>>> maintaining
> >>>>>>>>>> old
> >>>>>>>>>>>>> protocol type would greatly
> >>>>>>>>>>>>> impact ongoing services using new stream coordinator. WDYT?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today,
> >> in
> >>>>>>>> the
> >>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness
> over
> >>>>>>>>>> workload
> >>>>>>>>>>>>> balance, and hence "learner task" is needed to break this
> >>>>>>> priority,
> >>>>>>>>> but
> >>>>>>>>>>>>> I'm
> >>>>>>>>>>>>> wondering if we can have a better solution within sticky task
> >>>>>>>>> assignor
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>> accommodate this?
> >>>>>>>>>>>>> Great question! That's what I explained in the proposal,
> which
> >>>>>>> is
> >>>>>>>>> that
> >>>>>>>>>> we
> >>>>>>>>>>>>> should breakdown our
> >>>>>>>>>>>>> delivery into different stages. At very beginning, our goal
> is
> >>>>>>> to
> >>>>>>>>>> trigger
> >>>>>>>>>>>>> learner task assignment only on
> >>>>>>>>>>>>> `new` hosts, where we shall leverage leader's knowledge of
> >>>>>>> previous
> >>>>>>>>>> round
> >>>>>>>>>>>>> of rebalance to figure out. After
> >>>>>>>>>>>>> stage one, our goal is to have a smooth scaling up
> experience,
> >>>>>>> but
> >>>>>>>>> the
> >>>>>>>>>>>>> task balance problem is kind of orthogonal.
> >>>>>>>>>>>>> The load balance problem is a much broader topic than auto
> >>>>>>> scaling,
> >>>>>>>>>> which
> >>>>>>>>>>>>> I figure worth discussing within
> >>>>>>>>>>>>> this KIP's context since it's a naturally next-step, but
> >>>>>>> wouldn't
> >>>>>>>> be
> >>>>>>>>>> the
> >>>>>>>>>>>>> main topic.
> >>>>>>>>>>>>> Learner task or auto scaling support should be treated as `a
> >>>>>>>> helpful
> >>>>>>>>>>>>> mechanism to reach load balance`, but not `an algorithm
> >> defining
> >>>>>>>> load
> >>>>>>>>>>>>> balance`. It would be great if you could share some insights
> of
> >>>>>>> the
> >>>>>>>>>> stream
> >>>>>>>>>>>>> task balance, which eventually helps us to break out of the
> >>>>>>>> KIP-429's
> >>>>>>>>>> scope
> >>>>>>>>>>>>> and even define a separate KIP to focus on task weight &
> >>>>>>> assignment
> >>>>>>>>>> logic
> >>>>>>>>>>>>> improvement.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Also thank you for making improvement on the KIP context and
> >>>>>>>>>> organization!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Boyang
> >>>>>>>>>>>>> ________________________________
> >>>>>>>>>>>>> From: Guozhang Wang <wangg...@gmail.com>
> >>>>>>>>>>>>> Sent: Saturday, March 2, 2019 6:00 AM
> >>>>>>>>>>>>> To: dev
> >>>>>>>>>>>>> Subject: Re: [DISCUSS] KIP-429 : Smooth Auto-Scaling for
> Kafka
> >>>>>>>>> Streams
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Hello Boyang,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I've just made a quick pass on the KIP and here are some
> >>>>>>> thoughts.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Meta:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. I'm still not sure if it's worthwhile to add a new type of
> >>>>>>>>> "learner
> >>>>>>>>>>>>> task" in addition to "standby task": if the only difference
> is
> >>>>>>> that
> >>>>>>>>> for
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> latter, we would consider workload balance while for the
> former
> >>>>>>> we
> >>>>>>>>>> would
> >>>>>>>>>>>>> not, I think we can just adjust the logic of
> StickyTaskAssignor
> >>>>>>> a
> >>>>>>>> bit
> >>>>>>>>>> to
> >>>>>>>>>>>>> break that difference. Adding a new type of task would be
> >>>>>>> adding a
> >>>>>>>>> lot
> >>>>>>>>>> of
> >>>>>>>>>>>>> code complexity, so if we can still piggy-back the logic on a
> >>>>>>>>>> standby-task
> >>>>>>>>>>>>> I would prefer to do so.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 2. One thing that's still not clear from the KIP wiki itself
> is
> >>>>>>>> which
> >>>>>>>>>>>>> layer
> >>>>>>>>>>>>> would the logic be implemented at. Although for most KIPs we
> >>>>>>> would
> >>>>>>>>> not
> >>>>>>>>>>>>> require internal implementation details but only public
> facing
> >>>>>>> API
> >>>>>>>>>>>>> updates,
> >>>>>>>>>>>>> for a KIP like this I think it still requires to flesh out
> >>>>>>> details
> >>>>>>>> on
> >>>>>>>>>> the
> >>>>>>>>>>>>> implementation design. More specifically: today Streams
> embed a
> >>>>>>>> full
> >>>>>>>>>>>>> fledged Consumer client, which hard-code a
> ConsumerCoordinator
> >>>>>>>>> inside,
> >>>>>>>>>>>>> Streams then injects a StreamsPartitionAssignor to its
> plugable
> >>>>>>>>>>>>> PartitionAssignor interface and inside the
> >>>>>>> StreamsPartitionAssignor
> >>>>>>>>> we
> >>>>>>>>>>>>> also
> >>>>>>>>>>>>> have a TaskAssignor interface whose default implementation is
> >>>>>>>>>>>>> StickyPartitionAssignor. Streams partition assignor logic
> today
> >>>>>>>> sites
> >>>>>>>>>> in
> >>>>>>>>>>>>> the latter two classes. Hence the hierarchy today is:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> KafkaConsumer -> ConsumerCoordinator ->
> >>>>>>> StreamsPartitionAssignor ->
> >>>>>>>>>>>>> StickyTaskAssignor.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> We need to think about where the proposed implementation
> would
> >>>>>>> take
> >>>>>>>>>> place
> >>>>>>>>>>>>> at, and personally I think it is not the best option to
> inject
> >>>>>>> all
> >>>>>>>> of
> >>>>>>>>>> them
> >>>>>>>>>>>>> into the StreamsPartitionAssignor / StickyTaskAssignor since
> >> the
> >>>>>>>>> logic
> >>>>>>>>>> of
> >>>>>>>>>>>>> "triggering another rebalance" etc would require some
> >>>>>>> coordinator
> >>>>>>>>> logic
> >>>>>>>>>>>>> which is hard to mimic at PartitionAssignor level. On the
> other
> >>>>>>>> hand,
> >>>>>>>>>>>>> since
> >>>>>>>>>>>>> we are embedding a KafkaConsumer client as a whole we cannot
> >>>>>>> just
> >>>>>>>>>> replace
> >>>>>>>>>>>>> ConsumerCoordinator with a specialized StreamsCoordinator
> like
> >>>>>>>>> Connect
> >>>>>>>>>>>>> does
> >>>>>>>>>>>>> in KIP-415. So I'd like to maybe split the current proposal
> in
> >>>>>>> both
> >>>>>>>>>>>>> consumer layer and streams-assignor layer like we did in
> >>>>>>>>>> KIP-98/KIP-129.
> >>>>>>>>>>>>> And then the key thing to consider is how to cut off the
> >>>>>>> boundary
> >>>>>>>> so
> >>>>>>>>>> that
> >>>>>>>>>>>>> the modifications we push to ConsumerCoordinator would be
> >>>>>>>> beneficial
> >>>>>>>>>>>>> universally for any consumers, while keep the
> Streams-specific
> >>>>>>>> logic
> >>>>>>>>> at
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> assignor level.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 3. Depending on which design direction we choose, our
> migration
> >>>>>>>> plan
> >>>>>>>>>> would
> >>>>>>>>>>>>> also be quite different. For example, if we stay with
> >>>>>>>>>> ConsumerCoordinator
> >>>>>>>>>>>>> whose protocol type is "consumer" still, and we can manage to
> >>>>>>> make
> >>>>>>>>> all
> >>>>>>>>>>>>> changes agnostic to brokers as well as to old versioned
> >>>>>>> consumers,
> >>>>>>>>> then
> >>>>>>>>>>>>> our
> >>>>>>>>>>>>> migration plan could be much easier.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 4. I think one major issue related to this KIP is that today,
> >> in
> >>>>>>>> the
> >>>>>>>>>>>>> StickyPartitionAssignor, we always try to honor stickiness
> over
> >>>>>>>>>> workload
> >>>>>>>>>>>>> balance, and hence "learner task" is needed to break this
> >>>>>>> priority,
> >>>>>>>>> but
> >>>>>>>>>>>>> I'm
> >>>>>>>>>>>>> wondering if we can have a better solution within sticky task
> >>>>>>>>> assignor
> >>>>>>>>>>>>> that
> >>>>>>>>>>>>> accommodate this?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Minor:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1. The idea of two rebalances have also been discussed in
> >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6145. So we
> should
> >>>>>>> add
> >>>>>>>>> the
> >>>>>>>>>>>>> reference on the wiki page as well.
> >>>>>>>>>>>>> 2. Could you also add a section describing how the
> subscription
> >>>>>>> /
> >>>>>>>>>>>>> assignment metadata will be re-formatted? Without this
> >>>>>>> information
> >>>>>>>> it
> >>>>>>>>>> is
> >>>>>>>>>>>>> hard to get to the bottom of your idea. For example in the
> >>>>>>> "Leader
> >>>>>>>>>>>>> Transfer
> >>>>>>>>>>>>> Before Scaling" section, I'm not sure why "S2 doesn't know S4
> >> is
> >>>>>>>> new
> >>>>>>>>>>>>> member"
> >>>>>>>>>>>>> and hence would blindly obey stickiness over workload balance
> >>>>>>>>>> requirement.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Thu, Feb 28, 2019 at 11:05 AM Boyang Chen <
> >>>>>>> bche...@outlook.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hey community friends,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm gladly inviting you to have a look at the proposal to
> add
> >>>>>>>>>>>>> incremental
> >>>>>>>>>>>>>> rebalancing to Kafka Streams, A.K.A auto-scaling support.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Smooth+Auto-Scaling+for+Kafka+Streams
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Special thanks to Guozhang for giving great guidances and
> >>>>>>>> important
> >>>>>>>>>>>>>> feedbacks while making this KIP!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Boyang
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> -- Guozhang
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>

-- 
-- Guozhang

Reply via email to