Hi John,

I think I may mis-communicate my ideas a bit, what I meant is simply for
case 1), let the active tasks reporting a lag of 0 instead of not reporting
any lags at all; in addition, for other hosts not hosting this tasks, and
hence we do not know the offset lag at all we set the
"StatefulTasksToRankedCandidates[task][1]
:= instance" just to make sure the current host is ranked the first while
all others are ranked equally after it. In this case we would still favor
stickiness for not moving such tasks out of its current host unless it
violates balance, in which case we are free to move it to any other hosts.
Does that make sense?

Also I just realized that in the update wiki page the algorithm section of
iteratively assigning / moving tasks based on
StatefulTasksToRankedCandidates until convergence was missing (it was there
in the previous version), could you add it back?

Guozhang


On Wed, Sep 4, 2019 at 2:59 PM John Roesler <j...@confluent.io> wrote:

> Thanks for the reply, Guozhang,
>
> I'll start by re-stating your cases, just to make sure we're on the same
> page...
>
> 1) The task is stateful, and all its stores are non-logged. For this case
> under the proposal, we would not have any standbys and the active version
> would actually not report any lag at all (not a lag of 0). Thus, all
> instances would be considered equal when it comes to assignment, although
> the assignment logic would know that the task is "heavy" because it has
> those non-logged stores and factor that in to the overall cluster balance.
>
> 2) The task is stateful and maybe has one logged and one non-logged store.
> As you say, in this case, the non-logged store would not contribute at all
> to anyone's reported lag. The active task would report a lag of 0, and the
> standbys would report their lag on the logged store. The assignment logic
> would take these reported lags into account.
>
> It sounds like there might be some dissonance on point (1). It sounds like
> you're saying we should try to assign non-logged stateful tasks back to an
> instance that has previously hosted it, or maybe we should assign it back
> to the most recent instance that hosted it, and then only reassign it if
> the movement would affect balance. I guess I'm not opposed to this, but I
> also don't really see the advantage. The fact is that we'd still wind up
> migrating it much of the time, so no one could depend on it not getting
> migrated, and at the same time we're paying a significant complexity cost
> in the assignor to support this case. Plus, we need to encode the previous
> owner of the task somehow in the wire protocol, which means we pay a
> payload-size penalty to support it as well.
>
> On the other hand, we should make our assignment as stable as possible in
> the implementation to avoid pointless "swapping" of logged and non-logged
> stateful tasks, as well as stateless tasks when it doesn't change the
> balance at all to do so. It seems like this should accomplish the same
> spiritual goal with no special cases.
>
> WDYT?
> -John
>
> On Wed, Sep 4, 2019 at 2:26 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello John,
> >
> > Your reasoning about non-logged state store looks good to me. But I think
> > it worth considering two cases differently: 1) a task's all state stores
> > are non-logged, 2) a task has non-logged state store but also have other
> > logged state store.
> >
> > For 1), we should not have any standbys for such tasks at all, but it is
> > not the same as a stateless task, because the only active task could
> report
> > a lag of "0" while all others should logically report the lag as infinity
> > (of course, they would not encode such value).
> >
> > For 2), it is sort of the same to a normal case: active task reporting a
> > lag of 0 while standby task reporting a lag summing all other logged
> state
> > stores, meaning that the non-logged state store does not matter in our
> > assignment semantics.
> >
> > The rationale I had is to effectively favor stickiness for those tasks in
> > case 1) than arbitrarily reassign them even though it is true that for a
> > non-logged store there's no durability and hence restoration guarantees
> > anyways from Streams. Then algorithmically we may consider assigning
> tasks
> > with descending order of the number of instances reporting lags for it,
> > then for such tasks there's always one candidate reporting a lag of 0,
> and
> > assigning them to any instance does not actually change the cumulated
> total
> > lag either. At that time we can decide "if it is still within load
> balance
> > factor, then let's respect stickiness, otherwise it's free to move".
> WDYT?
> >
> >
> > Guozhang
> >
> > On Wed, Sep 4, 2019 at 8:30 AM John Roesler <j...@confluent.io> wrote:
> >
> > > Hey Bruno,
> > >
> > > Thanks for taking another look. Some quick responses:
> > >
> > > 1) It just means the number of offsets in the topic. E.g., the LSO is
> > 100,
> > > but the first offset is 40 due to retention, so there are 60 offsets in
> > the
> > > topic. Further, the lag on that topic would be considered to be 60 for
> > any
> > > task that hadn't previously done any work on it.
> > >
> > > 2) This is undecidable in general. I.e., there's no way we can know
> > whether
> > > the store is remote or not, and hence whether we can freely assign it
> to
> > > another instance, or whether we have to keep it on the same instance.
> > > However, there are a couple of reasons to go ahead and assume we have
> the
> > > freedom to move such tasks.
> > > * We know that nothing can prevent the loss of an instance in a cluster
> > > (I.e., this is true of all cloud environments, as well as any managed
> > > virtualized cluster like mesos or kubernetes), so any Streams program
> > that
> > > makes use of non-remote, non-logged state is doomed to lose its state
> > when
> > > it loses an instance.
> > > * If we take on a restriction that we cannot move such state between
> > > instances, we'd become overconstrained very quickly. Effectively, if
> you
> > > made use of non-logged stores, and we didn't assume freedom of
> movement,
> > > then we couldn't make use of any new instances in your cluster.
> > > * On the other hand, if we optimistically assume we can't move state,
> but
> > > only reassign it when we lose an instance, then we're supporting
> > > non-deterministic logic, because the program would produce different
> > > results, depending on whether you lost a node during the execution or
> > not.
> > > 2b) That last point goes along with your side note. I'm not sure if we
> > > should bother dropping such state on every reassignment, though. It
> seems
> > > to be undefined territory enough that we can just do the simplest thing
> > and
> > > assume people have made their own (external) provisions for durability.
> > > I.e., when we say "non-logged", we mean that it doesn't make use of
> _our_
> > > durability mechanism. I'm arguing that the only sane assumption is that
> > > such folks have opted to use their own durability measures, and we
> should
> > > just assume it works with no special considerations in the assignment
> > > algorithm.
> > >
> > > 3) Good catch! I've fixed it.
> > >
> > > Thanks again!
> > > -John
> > >
> > > On Wed, Sep 4, 2019 at 6:09 AM Bruno Cadonna <br...@confluent.io>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > 1) What do you mean with "full set of offsets in the topic"? Is this
> > > > the sum of all offsets of the changelog partitions of the task?
> > > >
> > > > 2) I am not sure whether non-logged stateful tasks should be
> > > > effectively treated as stateless tasks during assignment. First we
> > > > need to decide whether a non-logged stateful task should preferably
> be
> > > > assigned to the same instance on which it just run in order to
> > > > continue to use its state or not.
> > > >
> > > > 3) In the example, you define stand-by tasks {S1, S2, ...} but never
> > > > use them, because below you use a dedicated row for stand-by tasks.
> > > >
> > > > As a side note to 2) since it is not directly related to this KIP: We
> > > > should decide if we want to avoid the possible non-determinism
> > > > introduced by non-logged stores or not. That is, if an instance hosts
> > > > a task with non-logged stores then we can have two cases after the
> > > > next rebalance: a) the task stays on the same instance and continues
> > > > to use the same state store as used so far or b) the task is assigned
> > > > to another instance and it starts an empty state store. The produced
> > > > results for these two cases might differ. To avoid the
> nondeterminism,
> > > > non-logged state stores would need to be wiped out before assignment.
> > > > Then the question arises, how the removal of non-logged state stores
> > > > before assignment would affect backward-compatibility.
> > > >
> > > > Best,
> > > > Bruno
> > > >
> > > > On Wed, Aug 21, 2019 at 11:40 PM John Roesler <j...@confluent.io>
> > wrote:
> > > > >
> > > > > Hi Guozhang,
> > > > >
> > > > > > My impression from your previous email is that inside the
> algorithm
> > > > when
> > > > > we
> > > > > are "filling" them to instances some deterministic logic would be
> > used
> > > to
> > > > > avoid the above case, is that correct?
> > > > >
> > > > > Yes, that was my plan, but I didn't formalize it. There was a
> > > requirement
> > > > > that the assignment algorithm must not produce a new assignment if
> > the
> > > > > current assignment is already balanced, so at the least, any
> > thrashing
> > > > > would be restricted to the "balancing" phase while tasks are moving
> > > > around
> > > > > the cluster.
> > > > >
> > > > > Anyway, I think it would be good to say that we'll "try to" produce
> > > > stable
> > > > > assignments, so I've added a "should" clause to the assignment
> spec:
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-AssignmentAlgorithm
> > > > >
> > > > > For example, we would sort the stateless tasks and available
> > instances
> > > > > before assigning them, so that the stateless task assignment would
> > > mostly
> > > > > stay stable between assignments, modulo the compute capacity of the
> > > > > instances changing a little as active stateful tasks get assigned
> in
> > > more
> > > > > balanced ways.
> > > > >
> > > > > Thanks,
> > > > > -John
> > > > >
> > > > >
> > > > > On Wed, Aug 21, 2019 at 1:55 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello John,
> > > > > >
> > > > > > That sounds reasonable. Just double checked the code that with
> > > logging
> > > > > > disabled the corresponding checkpoint file would not contain any
> > > > values,
> > > > > > just like a stateless task. So I think treating them logically
> the
> > > > same is
> > > > > > fine.
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 21, 2019 at 11:41 AM John Roesler <j...@confluent.io
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi again, Guozhang,
> > > > > > >
> > > > > > > While writing up the section on stateless tasks (
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statelesstasks
> > > > > > > ),
> > > > > > > I reconsidered whether stateful, but non-logged, tasks should
> > > > actually
> > > > > > > report a lag of zero, versus not reporting any lag. By the
> > > > definition of
> > > > > > > the "StatefulTasksToRankedCandidates" function, the leader
> would
> > > > compute
> > > > > > a
> > > > > > > lag of zero for these tasks anyway.
> > > > > > >
> > > > > > > Therefore, I think the same reasoning that I supplied you for
> > > > stateless
> > > > > > > tasks applies, since the member and leader will agree on a lag
> of
> > > > zero
> > > > > > > anyway, we can avoid adding them to the "Task Lags" map, and
> save
> > > > some
> > > > > > > bytes in the JoinGroup request. This would be especially
> > beneficial
> > > > in an
> > > > > > > application that uses remote stores for _all_ its state stores,
> > it
> > > > would
> > > > > > > have an extremely lightweight JoinGroup request, with no task
> > lags
> > > at
> > > > > > all.
> > > > > > >
> > > > > > > WDYT?
> > > > > > > -John
> > > > > > >
> > > > > > > On Wed, Aug 21, 2019 at 1:17 PM John Roesler <
> j...@confluent.io>
> > > > wrote:
> > > > > > >
> > > > > > > > Thanks, Guozhang.
> > > > > > > >
> > > > > > > > (Side note: I noticed on another pass over the discussion
> that
> > > I'd
> > > > > > missed
> > > > > > > > addressing your comment about the potential race condition
> > > between
> > > > > > state
> > > > > > > > cleanup and lag-based assignment. I've added a solution to
> the
> > > > > > proposal:
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Racebetweenassignmentandstatecleanup
> > > > > > > > )
> > > > > > > >
> > > > > > > > In the JoinGroup (SubscriptionInfo) metadata, stateless tasks
> > are
> > > > not
> > > > > > > > represented at all. This should save us some bytes in the
> > request
> > > > > > > metadata.
> > > > > > > > If we treated them like non-logged stateful tasks and
> reported
> > a
> > > > lag of
> > > > > > > 0,
> > > > > > > > the only difference is that the assignor would be able to
> tell
> > > > which
> > > > > > > > members previously hosted that stateless task.
> > > > > > > >
> > > > > > > > I'd like to make a simplifying assumption that stateless
> tasks
> > > can
> > > > just
> > > > > > > be
> > > > > > > > freely reassigned with no regard to stickiness at all,
> without
> > > > > > impacting
> > > > > > > > performance. This is almost true. In fact, while assigned a
> > > > stateless
> > > > > > > task,
> > > > > > > > a member fetches batches of records from the broker, so if we
> > > move
> > > > the
> > > > > > > > stateless task assignment, this buffered input is wasted and
> > just
> > > > gets
> > > > > > > > dropped.
> > > > > > > >
> > > > > > > > However, we won't be moving the stateless tasks around all
> the
> > > time
> > > > > > (just
> > > > > > > > during rebalances), and we have the requirement that the
> > > assigment
> > > > > > > > algorithm must stabilize to guard against perpetually
> > shuffling a
> > > > > > > stateless
> > > > > > > > task from one node to another. So, my hope is that this small
> > > > amount of
> > > > > > > > inefficiency would not be a performance-dominating factor. In
> > > > exchange,
> > > > > > > we
> > > > > > > > gain the opportunity for the assignment algorithm to use the
> > > > stateless
> > > > > > > > tasks as "filler" during unbalanced assignments. For example,
> > if
> > > > there
> > > > > > > is a
> > > > > > > > node that is just warming up with several standby tasks,
> maybe
> > > the
> > > > > > > > assignment can give more stateless tasks to that node to
> > balance
> > > > the
> > > > > > > > computational load across the cluster.
> > > > > > > >
> > > > > > > > It's worth noting that such an assignment would still not be
> > > > considered
> > > > > > > > "balanced", so the ultimately balanced final state of the
> > > > assignment
> > > > > > > (after
> > > > > > > > task movements) would still have the desired property that
> each
> > > > > > stateful
> > > > > > > > and stateless task is evenly spread across the cluster.
> > > > > > > >
> > > > > > > > Does that seem reasonable?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > -John
> > > > > > > >
> > > > > > > > On Wed, Aug 21, 2019 at 11:22 AM Guozhang Wang <
> > > wangg...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hello John,
> > > > > > > >>
> > > > > > > >> I've made another pass on the wiki page again, overall LGTM.
> > One
> > > > meta
> > > > > > > >> comment about the "stateless" tasks: how do we represent
> them
> > in
> > > > the
> > > > > > > >> metadata? Are they just treated as stateful tasks with
> logging
> > > > > > disabled,
> > > > > > > >> or
> > > > > > > >> are specially handled? It is not very clear in the
> > description.
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> Guozhang
> > > > > > > >>
> > > > > > > >> On Wed, Aug 21, 2019 at 8:43 AM John Roesler <
> > j...@confluent.io
> > > >
> > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > I have also specifically called out that the assignment
> must
> > > > achieve
> > > > > > > >> both
> > > > > > > >> > "instance" and "task" balance:
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Defining%22balance%22
> > > > > > > >> >
> > > > > > > >> > I've also addressed the problem of state stores with
> logging
> > > > > > disabled:
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Statewithloggingdisabled
> > > > > > > >> >
> > > > > > > >> > I believe this addresses all the concerns that have been
> > > raised
> > > > to
> > > > > > > date.
> > > > > > > >> > Apologies if I've overlooked one of your concerns.
> > > > > > > >> >
> > > > > > > >> > Please give the KIP another read and let me know of any
> > > further
> > > > > > > >> thoughts!
> > > > > > > >> > Hopefully, we can start the voting on this KIP by the end
> of
> > > the
> > > > > > week.
> > > > > > > >> >
> > > > > > > >> > Thanks,
> > > > > > > >> > -John
> > > > > > > >> >
> > > > > > > >> > On Tue, Aug 20, 2019 at 5:16 PM John Roesler <
> > > j...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > > >> >
> > > > > > > >> > > In response to Bruno's concern #2, I've also added that
> > > > section to
> > > > > > > the
> > > > > > > >> > > "Rejected Alternatives" section.
> > > > > > > >> > >
> > > > > > > >> > > Additionally, after reviewing some other assignment
> > papers,
> > > > I've
> > > > > > > >> > developed
> > > > > > > >> > > the concern that specifying which "phases" the
> assignment
> > > > > > algorithm
> > > > > > > >> > should
> > > > > > > >> > > have, or indeed the logic of it at all, might be a
> mistake
> > > > that
> > > > > > > >> > > over-constrains our ability to write an optimal
> algorithm.
> > > > > > > Therefore,
> > > > > > > >> > I've
> > > > > > > >> > > also refactored the KIP to just describe the protocol,
> and
> > > > specify
> > > > > > > the
> > > > > > > >> > > requirements for the assignment algorithm, but not its
> > exact
> > > > > > > behavior
> > > > > > > >> at
> > > > > > > >> > > all.
> > > > > > > >> > >
> > > > > > > >> > > Thanks,
> > > > > > > >> > > -John
> > > > > > > >> > >
> > > > > > > >> > > On Tue, Aug 20, 2019 at 5:13 PM John Roesler <
> > > > j...@confluent.io>
> > > > > > > >> wrote:
> > > > > > > >> > >
> > > > > > > >> > >> Hi All,
> > > > > > > >> > >>
> > > > > > > >> > >> Thanks for the discussion. I've been considering the
> idea
> > > of
> > > > > > giving
> > > > > > > >> the
> > > > > > > >> > >> "catching up" tasks a different name/role. I was in
> favor
> > > > > > > initially,
> > > > > > > >> but
> > > > > > > >> > >> after working though some details, I think it causes
> some
> > > > > > problems,
> > > > > > > >> > which
> > > > > > > >> > >> I've written up in the "rejected alternatives" part of
> > the
> > > > KIP:
> > > > > > > >> > >>
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements
> > > > > > > >> > >>
> > > > > > > >> > >> Please give it a read and let me know what you think.
> > > > > > > >> > >>
> > > > > > > >> > >> Thanks,
> > > > > > > >> > >> -John
> > > > > > > >> > >>
> > > > > > > >> > >> On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <
> > > > wangg...@gmail.com
> > > > > > >
> > > > > > > >> > wrote:
> > > > > > > >> > >>
> > > > > > > >> > >>> I think I agree with you Sophie. My gut feeling is
> that
> > 1)
> > > > it
> > > > > > > should
> > > > > > > >> > not
> > > > > > > >> > >>> be
> > > > > > > >> > >>> the major concern in assignor's algorithm for standby
> > > tasks
> > > > not
> > > > > > > >> > catching
> > > > > > > >> > >>> up, but rather be tackled in different modules, and
> 2) a
> > > > lot of
> > > > > > > >> > >>> optimization can be down at the stream thread itself,
> > like
> > > > > > > dedicated
> > > > > > > >> > >>> threading and larger batching, or even complicated
> > > > scheduling
> > > > > > > >> > mechanisms
> > > > > > > >> > >>> between running, restoring and standby tasks. In
> > anyways,
> > > I
> > > > > > think
> > > > > > > we
> > > > > > > >> > can
> > > > > > > >> > >>> take this out of the scope of KIP-441 for now.
> > > > > > > >> > >>>
> > > > > > > >> > >>>
> > > > > > > >> > >>> Guozhang
> > > > > > > >> > >>>
> > > > > > > >> > >>>
> > > > > > > >> > >>> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <
> > > > > > > >> > sop...@confluent.io>
> > > > > > > >> > >>> wrote:
> > > > > > > >> > >>>
> > > > > > > >> > >>> > > we may have other ways to not starving the standby
> > > > tasks,
> > > > > > for
> > > > > > > >> > >>> example, by
> > > > > > > >> > >>> > > using dedicate threads for standby tasks or even
> > > > consider
> > > > > > > having
> > > > > > > >> > >>> > *higher> priority for standby than active* so that
> we
> > > > always
> > > > > > try
> > > > > > > >> to
> > > > > > > >> > >>> caught
> > > > > > > >> > >>> > up standby
> > > > > > > >> > >>> > > first, then process active
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > This is an interesting idea, but seems likely to get
> > in
> > > > the
> > > > > > way
> > > > > > > of
> > > > > > > >> > the
> > > > > > > >> > >>> > original idea of this KIP
> > > > > > > >> > >>> > -- if we always process standby tasks first, then if
> > we
> > > > are
> > > > > > > >> assigned
> > > > > > > >> > a
> > > > > > > >> > >>> new
> > > > > > > >> > >>> > standby task we
> > > > > > > >> > >>> > will have to wait for it to catch up completely
> before
> > > > > > > processing
> > > > > > > >> any
> > > > > > > >> > >>> > active tasks! That's
> > > > > > > >> > >>> > even worse than the situation this KIP is trying to
> > help
> > > > with,
> > > > > > > >> since
> > > > > > > >> > a
> > > > > > > >> > >>> new
> > > > > > > >> > >>> > standby task has to
> > > > > > > >> > >>> > restore from 0 (whereas an active task at least can
> > take
> > > > over
> > > > > > > from
> > > > > > > >> > >>> wherever
> > > > > > > >> > >>> > the standby was).
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > During restoration -- while there exist any
> restoring
> > > > tasks
> > > > > > -- I
> > > > > > > >> > think
> > > > > > > >> > >>> it's
> > > > > > > >> > >>> > reasonable to de-prioritize the
> > > > > > > >> > >>> > standby tasks and just process restoring and active
> > > tasks
> > > > so
> > > > > > > both
> > > > > > > >> can
> > > > > > > >> > >>> make
> > > > > > > >> > >>> > progress. But we should
> > > > > > > >> > >>> > let them catch up afterwards somehow -- maybe we can
> > > apply
> > > > > > some
> > > > > > > >> kind
> > > > > > > >> > of
> > > > > > > >> > >>> > heuristic, like "if we haven't
> > > > > > > >> > >>> > processed standbys for X iterations, or Y
> > milliseconds,
> > > > do so
> > > > > > > >> now."
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > Actually, it might even be beneficial to avoid
> > > processing
> > > > > > > >> standbys a
> > > > > > > >> > >>> record
> > > > > > > >> > >>> > or two at a time and instead
> > > > > > > >> > >>> > wait for a large enough batch to build up for the
> > > RocksDB
> > > > > > > >> > bulk-loading
> > > > > > > >> > >>> > benefits.
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > I think the "use dedicated threads for standby" is
> the
> > > > more
> > > > > > > >> promising
> > > > > > > >> > >>> end
> > > > > > > >> > >>> > goal, especially since
> > > > > > > >> > >>> > if we split restoration into "restoring tasks" then
> > > > active and
> > > > > > > >> > standbys
> > > > > > > >> > >>> > share almost nothing. But
> > > > > > > >> > >>> > that seems like follow-up work to the current KIP :)
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman <
> > > > > > > >> > >>> sop...@confluent.io>
> > > > > > > >> > >>> > wrote:
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > > Stateful tasks with logging disabled seem to be an
> > > > > > interesting
> > > > > > > >> edge
> > > > > > > >> > >>> case.
> > > > > > > >> > >>> > > On the one hand,
> > > > > > > >> > >>> > > for balancing purposes they should be considered
> > > > stateful
> > > > > > > since
> > > > > > > >> as
> > > > > > > >> > >>> > > Guozhang pointed out
> > > > > > > >> > >>> > > they are still "heavy" in IO costs. But for
> > "catching
> > > > up"
> > > > > > > >> purposes,
> > > > > > > >> > >>> ie
> > > > > > > >> > >>> > > when allocating standby
> > > > > > > >> > >>> > > tasks that will become active tasks, they should
> be
> > > > > > considered
> > > > > > > >> > >>> stateless
> > > > > > > >> > >>> > > as there is so
> > > > > > > >> > >>> > > meaningful sense of their lag. We should never
> > > allocate
> > > > > > > standby
> > > > > > > >> > >>> tasks for
> > > > > > > >> > >>> > > them during the
> > > > > > > >> > >>> > > first rebalance, but should ensure they are evenly
> > > > > > distributed
> > > > > > > >> > across
> > > > > > > >> > >>> > > instances. Maybe we
> > > > > > > >> > >>> > > should split these into a third category -- after
> we
> > > > assign
> > > > > > > all
> > > > > > > >> > >>> stateful
> > > > > > > >> > >>> > > tasks with logging, we
> > > > > > > >> > >>> > > then distribute the set of logging-disabled
> stateful
> > > > tasks
> > > > > > to
> > > > > > > >> > improve
> > > > > > > >> > >>> > > balance, before lastly
> > > > > > > >> > >>> > > distributing stateless tasks?
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> > > This actually leads into what I was just thinking,
> > > > which is
> > > > > > > >> that we
> > > > > > > >> > >>> > really
> > > > > > > >> > >>> > > should distinguish the
> > > > > > > >> > >>> > > "catch-up" standbys from normal standbys as well
> as
> > > > > > > >> distinguishing
> > > > > > > >> > >>> > > actively processing tasks
> > > > > > > >> > >>> > > from active tasks that are still in the restore
> > phase.
> > > > It's
> > > > > > > >> > somewhat
> > > > > > > >> > >>> > > awkward that today, some
> > > > > > > >> > >>> > > active tasks just start processing immediately
> while
> > > > others
> > > > > > > >> behave
> > > > > > > >> > >>> more
> > > > > > > >> > >>> > > like standby than active
> > > > > > > >> > >>> > > tasks for some time, before switching to real
> > active.
> > > > They
> > > > > > > first
> > > > > > > >> > use
> > > > > > > >> > >>> the
> > > > > > > >> > >>> > > restoreConsumer, then
> > > > > > > >> > >>> > > later only the "normal" consumer.
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> > > However, this restore period is still distinct
> from
> > > > normal
> > > > > > > >> standbys
> > > > > > > >> > >>> in a
> > > > > > > >> > >>> > > lot of ways -- the code path
> > > > > > > >> > >>> > > for restoring is different than for updating
> > standbys,
> > > > for
> > > > > > > >> example
> > > > > > > >> > >>> in how
> > > > > > > >> > >>> > > long we block on #poll.
> > > > > > > >> > >>> > > So in addition to giving them their own name --
> > let's
> > > go
> > > > > > with
> > > > > > > >> > >>> restoring
> > > > > > > >> > >>> > > task for now -- they really
> > > > > > > >> > >>> > > do seem to deserve being their own distinct task.
> We
> > > can
> > > > > > > >> optimize
> > > > > > > >> > >>> them
> > > > > > > >> > >>> > for
> > > > > > > >> > >>> > > efficient conversion
> > > > > > > >> > >>> > > to active tasks since we know that's what they
> will
> > > be.
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> > > This resolves some of the awkwardness of dealing
> > with
> > > > the
> > > > > > > >> special
> > > > > > > >> > >>> case
> > > > > > > >> > >>> > > mentioned above: we
> > > > > > > >> > >>> > > find a balanced assignment of stateful and
> stateless
> > > > tasks,
> > > > > > > and
> > > > > > > >> > >>> create
> > > > > > > >> > >>> > > restoring tasks as needed.
> > > > > > > >> > >>> > > If logging is disabled, no restoring task is
> > created.
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <
> > > > > > > >> wangg...@gmail.com>
> > > > > > > >> > >>> wrote:
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> > >> Regarding 3) above: I think for active task they
> > > should
> > > > > > still
> > > > > > > >> be
> > > > > > > >> > >>> > >> considered
> > > > > > > >> > >>> > >> stateful since the processor would still pay IO
> > cost
> > > > > > > accessing
> > > > > > > >> the
> > > > > > > >> > >>> > store,
> > > > > > > >> > >>> > >> but they would not have standby tasks?
> > > > > > > >> > >>> > >>
> > > > > > > >> > >>> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <
> > > > > > > >> br...@confluent.io>
> > > > > > > >> > >>> > wrote:
> > > > > > > >> > >>> > >>
> > > > > > > >> > >>> > >> > Hi,
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > Thank you for the KIP!
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > Some questions/comments:
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > 1. I am wondering if the "stand-by" tasks that
> > > catch
> > > > up
> > > > > > > state
> > > > > > > >> > >>> before
> > > > > > > >> > >>> > >> > the active task is switched deserve its own
> name
> > in
> > > > this
> > > > > > > KIP
> > > > > > > >> and
> > > > > > > >> > >>> maybe
> > > > > > > >> > >>> > >> > in the code. We have already stated that they
> are
> > > not
> > > > > > true
> > > > > > > >> > >>> stand-by
> > > > > > > >> > >>> > >> > tasks, they are not configured through
> > > > > > > >> `num.standby.replicas`,
> > > > > > > >> > and
> > > > > > > >> > >>> > >> > maybe they have also other properties that
> > > > distinguish
> > > > > > them
> > > > > > > >> from
> > > > > > > >> > >>> true
> > > > > > > >> > >>> > >> > stand-by tasks of which we are not aware yet.
> For
> > > > > > example,
> > > > > > > >> they
> > > > > > > >> > >>> may be
> > > > > > > >> > >>> > >> > prioritized differently than other tasks.
> > > > Furthermore,
> > > > > > the
> > > > > > > >> name
> > > > > > > >> > >>> > >> > "stand-by" does not really fit with the planned
> > > > > > > >> functionality of
> > > > > > > >> > >>> those
> > > > > > > >> > >>> > >> > tasks. In the following, I will call them false
> > > > stand-by
> > > > > > > >> tasks.
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > 2. Did you consider to trigger the probing
> > > > rebalances not
> > > > > > > at
> > > > > > > >> > >>> regular
> > > > > > > >> > >>> > >> > time intervals but when the false stand-by
> tasks
> > > > reach an
> > > > > > > >> > >>> acceptable
> > > > > > > >> > >>> > >> > lag? If you did consider, could you add a
> > paragraph
> > > > why
> > > > > > you
> > > > > > > >> > >>> rejected
> > > > > > > >> > >>> > >> > this idea to the "Rejected Alternatives"
> section.
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > 3. Are tasks that solely contain stores with
> > > disabled
> > > > > > > logging
> > > > > > > >> > >>> > >> > classified as stateful or stateless in the
> > > > algorithm? I
> > > > > > > would
> > > > > > > >> > >>> guess
> > > > > > > >> > >>> > >> > stateless, although if possible they should be
> > > > assigned
> > > > > > to
> > > > > > > >> the
> > > > > > > >> > >>> same
> > > > > > > >> > >>> > >> > instance they had run before the rebalance. As
> > far
> > > > as I
> > > > > > can
> > > > > > > >> see
> > > > > > > >> > >>> this
> > > > > > > >> > >>> > >> > special case is not handled in the algorithm.
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > Best,
> > > > > > > >> > >>> > >> > Bruno
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <
> > > > > > > >> > wangg...@gmail.com>
> > > > > > > >> > >>> > >> wrote:
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > 1. Sounds good, just wanted to clarify; and
> it
> > > may
> > > > > > worth
> > > > > > > >> > >>> documenting
> > > > > > > >> > >>> > >> it
> > > > > > > >> > >>> > >> > so
> > > > > > > >> > >>> > >> > > that users would not be surprised when
> > monitoring
> > > > their
> > > > > > > >> > >>> footprint.
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > 2. Hmm I see... I think the trade-off can be
> > > > described
> > > > > > as
> > > > > > > >> "how
> > > > > > > >> > >>> much
> > > > > > > >> > >>> > >> > > imbalance would bother you to be willing to
> pay
> > > > another
> > > > > > > >> > >>> rebalance,
> > > > > > > >> > >>> > >> along
> > > > > > > >> > >>> > >> > > with potentially more restoration lag", and
> the
> > > > current
> > > > > > > >> > >>> definition
> > > > > > > >> > >>> > of
> > > > > > > >> > >>> > >> > > rebalance_factor can be considered as a rough
> > > > > > measurement
> > > > > > > >> of
> > > > > > > >> > >>> that
> > > > > > > >> > >>> > >> > > imbalance. Of course one can argue that a
> finer
> > > > grained
> > > > > > > >> > >>> measurement
> > > > > > > >> > >>> > >> could
> > > > > > > >> > >>> > >> > > be "resource footprint" like CPU / storage of
> > > each
> > > > > > > instance
> > > > > > > >> > >>> like we
> > > > > > > >> > >>> > >> have
> > > > > > > >> > >>> > >> > in
> > > > > > > >> > >>> > >> > > Kafka broker auto balancing tools, but I'd
> > prefer
> > > > not
> > > > > > > doing
> > > > > > > >> > >>> that as
> > > > > > > >> > >>> > >> part
> > > > > > > >> > >>> > >> > of
> > > > > > > >> > >>> > >> > > the library but more as an operational tool
> in
> > > the
> > > > > > > future.
> > > > > > > >> On
> > > > > > > >> > >>> the
> > > > > > > >> > >>> > >> other
> > > > > > > >> > >>> > >> > > hand, I've seen stateful and stateless tasks
> > > having
> > > > > > very
> > > > > > > >> > >>> different
> > > > > > > >> > >>> > >> load,
> > > > > > > >> > >>> > >> > > and sometimes the only bottleneck of a
> Streams
> > > app
> > > > is
> > > > > > > just
> > > > > > > >> one
> > > > > > > >> > >>> > >> stateful
> > > > > > > >> > >>> > >> > > sub-topology and whoever gets tasks of that
> > > > > > sub-topology
> > > > > > > >> > become
> > > > > > > >> > >>> > >> hotspot
> > > > > > > >> > >>> > >> > > (and that's why our algorithm tries to
> balance
> > > per
> > > > > > > >> > sub-topology
> > > > > > > >> > >>> as
> > > > > > > >> > >>> > >> well),
> > > > > > > >> > >>> > >> > > so maybe we can just consider stateful tasks
> > when
> > > > > > > >> calculating
> > > > > > > >> > >>> this
> > > > > > > >> > >>> > >> factor
> > > > > > > >> > >>> > >> > > as a very brute force heuristic?
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > 3.a. Thinking about this a bit more, maybe
> it's
> > > > better
> > > > > > > not
> > > > > > > >> try
> > > > > > > >> > >>> to
> > > > > > > >> > >>> > >> tackle
> > > > > > > >> > >>> > >> > an
> > > > > > > >> > >>> > >> > > unseen enemy just yet, and observe if it
> really
> > > > emerges
> > > > > > > >> later,
> > > > > > > >> > >>> and
> > > > > > > >> > >>> > by
> > > > > > > >> > >>> > >> > then
> > > > > > > >> > >>> > >> > > we may have other ways to not starving the
> > > standby
> > > > > > tasks,
> > > > > > > >> for
> > > > > > > >> > >>> > >> example, by
> > > > > > > >> > >>> > >> > > using dedicate threads for standby tasks or
> > even
> > > > > > consider
> > > > > > > >> > having
> > > > > > > >> > >>> > >> higher
> > > > > > > >> > >>> > >> > > priority for standby than active so that we
> > > always
> > > > try
> > > > > > to
> > > > > > > >> > >>> caught up
> > > > > > > >> > >>> > >> > standby
> > > > > > > >> > >>> > >> > > first, then process active; and if active's
> > > lagging
> > > > > > > >> compared
> > > > > > > >> > to
> > > > > > > >> > >>> > >> > > log-end-offset is increasing then we should
> > > > increase
> > > > > > > >> capacity,
> > > > > > > >> > >>> etc
> > > > > > > >> > >>> > >> etc.
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > 4. Actually with KIP-429 this may not be the
> > > case:
> > > > we
> > > > > > may
> > > > > > > >> not
> > > > > > > >> > >>> call
> > > > > > > >> > >>> > >> > > onPartitionsRevoked prior to rebalance any
> more
> > > so
> > > > > > would
> > > > > > > >> not
> > > > > > > >> > >>> transit
> > > > > > > >> > >>> > >> > state
> > > > > > > >> > >>> > >> > > to PARTITIONS_REVOKED, and hence not cause
> the
> > > > state of
> > > > > > > the
> > > > > > > >> > >>> instance
> > > > > > > >> > >>> > >> to
> > > > > > > >> > >>> > >> > be
> > > > > > > >> > >>> > >> > > REBALANCING. In other words, even if a
> instance
> > > is
> > > > > > > >> undergoing
> > > > > > > >> > a
> > > > > > > >> > >>> > >> rebalance
> > > > > > > >> > >>> > >> > > it's state may still be RUNNING and it may
> > still
> > > be
> > > > > > > >> processing
> > > > > > > >> > >>> > >> records at
> > > > > > > >> > >>> > >> > > the same time.
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler
> <
> > > > > > > >> > j...@confluent.io
> > > > > > > >> > >>> >
> > > > > > > >> > >>> > >> wrote:
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > > Hey Guozhang,
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > Thanks for the review!
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > 1. Yes, even with `num.standby.replicas :=
> > 0`,
> > > we
> > > > > > will
> > > > > > > >> still
> > > > > > > >> > >>> > >> > temporarily
> > > > > > > >> > >>> > >> > > > allocate standby tasks to accomplish a
> > > > no-downtime
> > > > > > task
> > > > > > > >> > >>> migration.
> > > > > > > >> > >>> > >> > > > Although, I'd argue that this doesn't
> really
> > > > violate
> > > > > > > the
> > > > > > > >> > >>> config,
> > > > > > > >> > >>> > as
> > > > > > > >> > >>> > >> the
> > > > > > > >> > >>> > >> > > > task isn't a true hot standby. As soon as
> it
> > > > catches
> > > > > > > up,
> > > > > > > >> > we'll
> > > > > > > >> > >>> > >> > rebalance
> > > > > > > >> > >>> > >> > > > again, that task will become active, and
> the
> > > > original
> > > > > > > >> > instance
> > > > > > > >> > >>> > that
> > > > > > > >> > >>> > >> > hosted
> > > > > > > >> > >>> > >> > > > the active task will no longer have the
> task
> > > > assigned
> > > > > > > at
> > > > > > > >> > all.
> > > > > > > >> > >>> Once
> > > > > > > >> > >>> > >> the
> > > > > > > >> > >>> > >> > > > stateDirCleaner kicks in, we'll free the
> disk
> > > > space
> > > > > > > from
> > > > > > > >> it,
> > > > > > > >> > >>> and
> > > > > > > >> > >>> > >> > return to
> > > > > > > >> > >>> > >> > > > the steady-state of having just one copy of
> > the
> > > > task
> > > > > > in
> > > > > > > >> the
> > > > > > > >> > >>> > cluster.
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > We can of course do without this, but I
> feel
> > > the
> > > > > > > current
> > > > > > > >> > >>> proposal
> > > > > > > >> > >>> > is
> > > > > > > >> > >>> > >> > > > operationally preferable, since it doesn't
> > make
> > > > > > > >> configuring
> > > > > > > >> > >>> > >> > hot-standbys a
> > > > > > > >> > >>> > >> > > > pre-requisite for fast rebalances.
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > 2. Yes, I think your interpretation is what
> > we
> > > > > > > intended.
> > > > > > > >> The
> > > > > > > >> > >>> > default
> > > > > > > >> > >>> > >> > > > balance_factor would be 1, as it is
> > implicitly
> > > > today.
> > > > > > > >> What
> > > > > > > >> > >>> this
> > > > > > > >> > >>> > >> does is
> > > > > > > >> > >>> > >> > > > allows operators to trade off less balanced
> > > > > > assignments
> > > > > > > >> > >>> against
> > > > > > > >> > >>> > >> fewer
> > > > > > > >> > >>> > >> > > > rebalances. If you have lots of space
> > capacity
> > > in
> > > > > > your
> > > > > > > >> > >>> instances,
> > > > > > > >> > >>> > >> this
> > > > > > > >> > >>> > >> > may
> > > > > > > >> > >>> > >> > > > be a perfectly fine tradeoff, and you may
> > > prefer
> > > > for
> > > > > > > >> Streams
> > > > > > > >> > >>> not
> > > > > > > >> > >>> > to
> > > > > > > >> > >>> > >> > bother
> > > > > > > >> > >>> > >> > > > streaming GBs of data from the broker in
> > > pursuit
> > > > of
> > > > > > > >> perfect
> > > > > > > >> > >>> > balance.
> > > > > > > >> > >>> > >> > Not
> > > > > > > >> > >>> > >> > > > married to this configuration, though. It
> was
> > > > > > inspired
> > > > > > > by
> > > > > > > >> > the
> > > > > > > >> > >>> > >> related
> > > > > > > >> > >>> > >> > work
> > > > > > > >> > >>> > >> > > > research we did.
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > 3. I'll take a look
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > 3a. I think this is a good idea. I'd
> classify
> > > it
> > > > as a
> > > > > > > >> type
> > > > > > > >> > of
> > > > > > > >> > >>> grey
> > > > > > > >> > >>> > >> > failure
> > > > > > > >> > >>> > >> > > > detection. It may make more sense to tackle
> > > grey
> > > > > > > >> failures as
> > > > > > > >> > >>> part
> > > > > > > >> > >>> > of
> > > > > > > >> > >>> > >> > the
> > > > > > > >> > >>> > >> > > > heartbeat protocol (as I POCed here:
> > > > > > > >> > >>> > >> > > >
> > > https://github.com/apache/kafka/pull/7096/files
> > > > ).
> > > > > > > WDYT?
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > 4. Good catch! I didn't think about that
> > > before.
> > > > > > > Looking
> > > > > > > >> at
> > > > > > > >> > it
> > > > > > > >> > >>> > now,
> > > > > > > >> > >>> > >> > though,
> > > > > > > >> > >>> > >> > > > I wonder if we're actually protected
> already.
> > > The
> > > > > > > >> > >>> stateDirCleaner
> > > > > > > >> > >>> > >> > thread
> > > > > > > >> > >>> > >> > > > only executes if the instance is in RUNNING
> > > > state,
> > > > > > and
> > > > > > > >> > KIP-441
> > > > > > > >> > >>> > >> > proposes to
> > > > > > > >> > >>> > >> > > > use "probing rebalances" to report task
> lag.
> > > > Hence,
> > > > > > > >> during
> > > > > > > >> > the
> > > > > > > >> > >>> > >> window
> > > > > > > >> > >>> > >> > > > between when the instance reports a lag and
> > the
> > > > > > > assignor
> > > > > > > >> > >>> makes a
> > > > > > > >> > >>> > >> > decision
> > > > > > > >> > >>> > >> > > > about it, the instance should remain in
> > > > REBALANCING
> > > > > > > >> state,
> > > > > > > >> > >>> right?
> > > > > > > >> > >>> > If
> > > > > > > >> > >>> > >> > so,
> > > > > > > >> > >>> > >> > > > then this should prevent the race
> condition.
> > If
> > > > not,
> > > > > > > >> then we
> > > > > > > >> > >>> do
> > > > > > > >> > >>> > >> indeed
> > > > > > > >> > >>> > >> > need
> > > > > > > >> > >>> > >> > > > to do something about it.
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > 5. Good idea. I think that today, you can
> > only
> > > > see
> > > > > > the
> > > > > > > >> > >>> consumer
> > > > > > > >> > >>> > lag,
> > > > > > > >> > >>> > >> > which
> > > > > > > >> > >>> > >> > > > is a poor substitute. I'll add some metrics
> > to
> > > > the
> > > > > > > >> proposal.
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > Thanks again for the comments!
> > > > > > > >> > >>> > >> > > > -John
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang
> Wang
> > <
> > > > > > > >> > >>> wangg...@gmail.com>
> > > > > > > >> > >>> > >> > wrote:
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > > > > Hello Sophie,
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > Thanks for the proposed KIP. I left some
> > > > comments
> > > > > > on
> > > > > > > >> the
> > > > > > > >> > >>> wiki
> > > > > > > >> > >>> > >> itself,
> > > > > > > >> > >>> > >> > > > and I
> > > > > > > >> > >>> > >> > > > > think I'm still not very clear on a
> couple
> > or
> > > > > > those:
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > 1. With this proposal, does that mean
> with
> > > > > > > >> > >>> num.standby.replicas
> > > > > > > >> > >>> > ==
> > > > > > > >> > >>> > >> > 0, we
> > > > > > > >> > >>> > >> > > > > may sometimes still have some standby
> tasks
> > > > which
> > > > > > may
> > > > > > > >> > >>> violate
> > > > > > > >> > >>> > the
> > > > > > > >> > >>> > >> > config?
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > 2. I think I understand the rationale to
> > > > consider
> > > > > > > lags
> > > > > > > >> > that
> > > > > > > >> > >>> is
> > > > > > > >> > >>> > >> below
> > > > > > > >> > >>> > >> > the
> > > > > > > >> > >>> > >> > > > > specified threshold to be equal, rather
> > than
> > > > still
> > > > > > > >> > >>> considering
> > > > > > > >> > >>> > >> 5000
> > > > > > > >> > >>> > >> > is
> > > > > > > >> > >>> > >> > > > > better than 5001 -- we do not want to
> > > > > > "over-optimize"
> > > > > > > >> and
> > > > > > > >> > >>> > >> potentially
> > > > > > > >> > >>> > >> > > > falls
> > > > > > > >> > >>> > >> > > > > into endless rebalances back and forth.
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > But I'm not clear about the rationale of
> > the
> > > > second
> > > > > > > >> > >>> parameter of
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>>
> > > > constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> > > > > > > >> > >>> > >> > > > > balance_factor):
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > Does that mean, e.g. with balance_factor
> of
> > > 3,
> > > > we'd
> > > > > > > >> > >>> consider two
> > > > > > > >> > >>> > >> > > > > assignments one resulting balance_factor
> 0
> > > and
> > > > one
> > > > > > > >> > resulting
> > > > > > > >> > >>> > >> > > > balance_factor
> > > > > > > >> > >>> > >> > > > > 3 to be equally optimized assignment and
> > > > therefore
> > > > > > > may
> > > > > > > >> > "stop
> > > > > > > >> > >>> > >> early"?
> > > > > > > >> > >>> > >> > This
> > > > > > > >> > >>> > >> > > > > was not very convincing to me :P
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > 3. There are a couple of minor comments
> > about
> > > > the
> > > > > > > >> > algorithm
> > > > > > > >> > >>> > >> itself,
> > > > > > > >> > >>> > >> > left
> > > > > > > >> > >>> > >> > > > on
> > > > > > > >> > >>> > >> > > > > the wiki page since it needs to refer to
> > the
> > > > exact
> > > > > > > line
> > > > > > > >> > and
> > > > > > > >> > >>> > better
> > > > > > > >> > >>> > >> > > > > displayed there.
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > 3.a Another wild thought about the
> > threshold
> > > > > > itself:
> > > > > > > >> today
> > > > > > > >> > >>> the
> > > > > > > >> > >>> > >> > assignment
> > > > > > > >> > >>> > >> > > > > itself is memoryless, so we would not
> know
> > if
> > > > the
> > > > > > > >> reported
> > > > > > > >> > >>> > >> `TaskLag`
> > > > > > > >> > >>> > >> > > > itself
> > > > > > > >> > >>> > >> > > > > is increasing or decreasing even if the
> > > current
> > > > > > value
> > > > > > > >> is
> > > > > > > >> > >>> under
> > > > > > > >> > >>> > the
> > > > > > > >> > >>> > >> > > > > threshold. I wonder if it worthy to make
> > it a
> > > > bit
> > > > > > > more
> > > > > > > >> > >>> > >> complicated to
> > > > > > > >> > >>> > >> > > > track
> > > > > > > >> > >>> > >> > > > > task lag trend at the assignor?
> Practically
> > > it
> > > > may
> > > > > > > not
> > > > > > > >> be
> > > > > > > >> > >>> very
> > > > > > > >> > >>> > >> > uncommon
> > > > > > > >> > >>> > >> > > > > that stand-by tasks are not keeping up
> due
> > to
> > > > the
> > > > > > > fact
> > > > > > > >> > that
> > > > > > > >> > >>> > other
> > > > > > > >> > >>> > >> > active
> > > > > > > >> > >>> > >> > > > > tasks hosted on the same thread is
> starving
> > > the
> > > > > > > standby
> > > > > > > >> > >>> tasks.
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > 4. There's a potential race condition
> risk
> > > when
> > > > > > > >> reporting
> > > > > > > >> > >>> > >> `TaskLags`
> > > > > > > >> > >>> > >> > in
> > > > > > > >> > >>> > >> > > > the
> > > > > > > >> > >>> > >> > > > > subscription: right after reporting it to
> > the
> > > > > > leader,
> > > > > > > >> the
> > > > > > > >> > >>> > cleanup
> > > > > > > >> > >>> > >> > thread
> > > > > > > >> > >>> > >> > > > > kicks in and deletes the state directory.
> > If
> > > > the
> > > > > > task
> > > > > > > >> was
> > > > > > > >> > >>> > assigned
> > > > > > > >> > >>> > >> > to the
> > > > > > > >> > >>> > >> > > > > host it would cause it to restore from
> > > > beginning
> > > > > > and
> > > > > > > >> > >>> effectively
> > > > > > > >> > >>> > >> > make the
> > > > > > > >> > >>> > >> > > > > seemingly optimized assignment very
> > > > sub-optimal.
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > To be on the safer side we should
> consider
> > > > either
> > > > > > > prune
> > > > > > > >> > out
> > > > > > > >> > >>> > those
> > > > > > > >> > >>> > >> > tasks
> > > > > > > >> > >>> > >> > > > > that are "close to be cleaned up" in the
> > > > > > > subscription,
> > > > > > > >> or
> > > > > > > >> > we
> > > > > > > >> > >>> > >> should
> > > > > > > >> > >>> > >> > delay
> > > > > > > >> > >>> > >> > > > > the cleanup right after we've included
> them
> > > in
> > > > the
> > > > > > > >> > >>> subscription
> > > > > > > >> > >>> > in
> > > > > > > >> > >>> > >> > case
> > > > > > > >> > >>> > >> > > > > they are been selected as assigned tasks
> by
> > > the
> > > > > > > >> assignor.
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > 5. This is a meta comment: I think it
> would
> > > be
> > > > > > > helpful
> > > > > > > >> to
> > > > > > > >> > >>> add
> > > > > > > >> > >>> > some
> > > > > > > >> > >>> > >> > user
> > > > > > > >> > >>> > >> > > > > visibility on the standby tasks lagging
> as
> > > > well,
> > > > > > via
> > > > > > > >> > >>> metrics for
> > > > > > > >> > >>> > >> > example.
> > > > > > > >> > >>> > >> > > > > Today it is hard for us to observe how
> far
> > > are
> > > > our
> > > > > > > >> current
> > > > > > > >> > >>> > standby
> > > > > > > >> > >>> > >> > tasks
> > > > > > > >> > >>> > >> > > > > compared to the active tasks and whether
> > that
> > > > lag
> > > > > > is
> > > > > > > >> being
> > > > > > > >> > >>> > >> > increasing or
> > > > > > > >> > >>> > >> > > > > decreasing. As a follow-up task, for
> > example,
> > > > the
> > > > > > > >> > rebalance
> > > > > > > >> > >>> > should
> > > > > > > >> > >>> > >> > also
> > > > > > > >> > >>> > >> > > > be
> > > > > > > >> > >>> > >> > > > > triggered if we realize that some standby
> > > > task's
> > > > > > lag
> > > > > > > is
> > > > > > > >> > >>> > increasing
> > > > > > > >> > >>> > >> > > > > indefinitely means that it cannot keep up
> > > > (which is
> > > > > > > >> > another
> > > > > > > >> > >>> > >> indicator
> > > > > > > >> > >>> > >> > > > > either you need to add more resources
> with
> > > the
> > > > > > > >> > num.standbys
> > > > > > > >> > >>> or
> > > > > > > >> > >>> > >> your
> > > > > > > >> > >>> > >> > are
> > > > > > > >> > >>> > >> > > > > still not balanced enough).
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie
> > > > Blee-Goldman
> > > > > > <
> > > > > > > >> > >>> > >> > sop...@confluent.io>
> > > > > > > >> > >>> > >> > > > > wrote:
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > > Hey all,
> > > > > > > >> > >>> > >> > > > > >
> > > > > > > >> > >>> > >> > > > > > I'd like to kick off discussion on
> > KIP-441,
> > > > aimed
> > > > > > > at
> > > > > > > >> the
> > > > > > > >> > >>> long
> > > > > > > >> > >>> > >> > restore
> > > > > > > >> > >>> > >> > > > > times
> > > > > > > >> > >>> > >> > > > > > in Streams during which further active
> > > > processing
> > > > > > > >> and IQ
> > > > > > > >> > >>> are
> > > > > > > >> > >>> > >> > blocked.
> > > > > > > >> > >>> > >> > > > > > Please give it a read and let us know
> > your
> > > > > > thoughts
> > > > > > > >> > >>> > >> > > > > >
> > > > > > > >> > >>> > >> > > > > >
> > > > > > > >> > >>> > >> > > > > >
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >>
> > > > > > > >> > >>> >
> > > > > > > >> > >>>
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > > > > > > >> > >>> > >> > > > > >
> > > > > > > >> > >>> > >> > > > > > Cheers,
> > > > > > > >> > >>> > >> > > > > > Sophie
> > > > > > > >> > >>> > >> > > > > >
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > > > --
> > > > > > > >> > >>> > >> > > > > -- Guozhang
> > > > > > > >> > >>> > >> > > > >
> > > > > > > >> > >>> > >> > > >
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > >
> > > > > > > >> > >>> > >> > > --
> > > > > > > >> > >>> > >> > > -- Guozhang
> > > > > > > >> > >>> > >> >
> > > > > > > >> > >>> > >>
> > > > > > > >> > >>> > >>
> > > > > > > >> > >>> > >> --
> > > > > > > >> > >>> > >> -- Guozhang
> > > > > > > >> > >>> > >>
> > > > > > > >> > >>> > >
> > > > > > > >> > >>> >
> > > > > > > >> > >>>
> > > > > > > >> > >>>
> > > > > > > >> > >>> --
> > > > > > > >> > >>> -- Guozhang
> > > > > > > >> > >>>
> > > > > > > >> > >>
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> --
> > > > > > > >> -- Guozhang
> > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to