Hi All,

Thanks for the discussion. I've been considering the idea of giving the
"catching up" tasks a different name/role. I was in favor initially, but
after working though some details, I think it causes some problems, which
I've written up in the "rejected alternatives" part of the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements

Please give it a read and let me know what you think.

Thanks,
-John

On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <wangg...@gmail.com> wrote:

> I think I agree with you Sophie. My gut feeling is that 1) it should not be
> the major concern in assignor's algorithm for standby tasks not catching
> up, but rather be tackled in different modules, and 2) a lot of
> optimization can be down at the stream thread itself, like dedicated
> threading and larger batching, or even complicated scheduling mechanisms
> between running, restoring and standby tasks. In anyways, I think we can
> take this out of the scope of KIP-441 for now.
>
>
> Guozhang
>
>
> On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > > we may have other ways to not starving the standby tasks, for example,
> by
> > > using dedicate threads for standby tasks or even consider having
> > *higher> priority for standby than active* so that we always try to
> caught
> > up standby
> > > first, then process active
> >
> > This is an interesting idea, but seems likely to get in the way of the
> > original idea of this KIP
> > -- if we always process standby tasks first, then if we are assigned a
> new
> > standby task we
> > will have to wait for it to catch up completely before processing any
> > active tasks! That's
> > even worse than the situation this KIP is trying to help with, since a
> new
> > standby task has to
> > restore from 0 (whereas an active task at least can take over from
> wherever
> > the standby was).
> >
> > During restoration -- while there exist any restoring tasks -- I think
> it's
> > reasonable to de-prioritize the
> > standby tasks and just process restoring and active tasks so both can
> make
> > progress. But we should
> > let them catch up afterwards somehow -- maybe we can apply some kind of
> > heuristic, like "if we haven't
> > processed standbys for X iterations, or Y milliseconds, do so now."
> >
> > Actually, it might even be beneficial to avoid processing standbys a
> record
> > or two at a time and instead
> > wait for a large enough batch to build up for the RocksDB bulk-loading
> > benefits.
> >
> > I think the "use dedicated threads for standby" is the more promising end
> > goal, especially since
> > if we split restoration into "restoring tasks" then active and standbys
> > share almost nothing. But
> > that seems like follow-up work to the current KIP :)
> >
> > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> >
> > > Stateful tasks with logging disabled seem to be an interesting edge
> case.
> > > On the one hand,
> > > for balancing purposes they should be considered stateful since as
> > > Guozhang pointed out
> > > they are still "heavy" in IO costs. But for "catching up" purposes, ie
> > > when allocating standby
> > > tasks that will become active tasks, they should be considered
> stateless
> > > as there is so
> > > meaningful sense of their lag. We should never allocate standby tasks
> for
> > > them during the
> > > first rebalance, but should ensure they are evenly distributed across
> > > instances. Maybe we
> > > should split these into a third category -- after we assign all
> stateful
> > > tasks with logging, we
> > > then distribute the set of logging-disabled stateful tasks to improve
> > > balance, before lastly
> > > distributing stateless tasks?
> > >
> > > This actually leads into what I was just thinking, which is that we
> > really
> > > should distinguish the
> > > "catch-up" standbys from normal standbys as well as distinguishing
> > > actively processing tasks
> > > from active tasks that are still in the restore phase. It's somewhat
> > > awkward that today, some
> > > active tasks just start processing immediately while others behave more
> > > like standby than active
> > > tasks for some time, before switching to real active. They first use
> the
> > > restoreConsumer, then
> > > later only the "normal" consumer.
> > >
> > > However, this restore period is still distinct from normal standbys in
> a
> > > lot of ways -- the code path
> > > for restoring is different than for updating standbys, for example in
> how
> > > long we block on #poll.
> > > So in addition to giving them their own name -- let's go with restoring
> > > task for now -- they really
> > > do seem to deserve being their own distinct task. We can optimize them
> > for
> > > efficient conversion
> > > to active tasks since we know that's what they will be.
> > >
> > > This resolves some of the awkwardness of dealing with the special case
> > > mentioned above: we
> > > find a balanced assignment of stateful and stateless tasks, and create
> > > restoring tasks as needed.
> > > If logging is disabled, no restoring task is created.
> > >
> > >
> > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > >> Regarding 3) above: I think for active task they should still be
> > >> considered
> > >> stateful since the processor would still pay IO cost accessing the
> > store,
> > >> but they would not have standby tasks?
> > >>
> > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <br...@confluent.io>
> > wrote:
> > >>
> > >> > Hi,
> > >> >
> > >> > Thank you for the KIP!
> > >> >
> > >> > Some questions/comments:
> > >> >
> > >> > 1. I am wondering if the "stand-by" tasks that catch up state before
> > >> > the active task is switched deserve its own name in this KIP and
> maybe
> > >> > in the code. We have already stated that they are not true stand-by
> > >> > tasks, they are not configured through `num.standby.replicas`, and
> > >> > maybe they have also other properties that distinguish them from
> true
> > >> > stand-by tasks of which we are not aware yet. For example, they may
> be
> > >> > prioritized differently than other tasks. Furthermore, the name
> > >> > "stand-by" does not really fit with the planned functionality of
> those
> > >> > tasks. In the following, I will call them false stand-by tasks.
> > >> >
> > >> > 2. Did you consider to trigger the probing rebalances not at regular
> > >> > time intervals but when the false stand-by tasks reach an acceptable
> > >> > lag? If you did consider, could you add a paragraph why you rejected
> > >> > this idea to the "Rejected Alternatives" section.
> > >> >
> > >> > 3. Are tasks that solely contain stores with disabled logging
> > >> > classified as stateful or stateless in the algorithm? I would guess
> > >> > stateless, although if possible they should be assigned to the same
> > >> > instance they had run before the rebalance. As far as I can see this
> > >> > special case is not handled in the algorithm.
> > >> >
> > >> > Best,
> > >> > Bruno
> > >> >
> > >> >
> > >> >
> > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <wangg...@gmail.com>
> > >> wrote:
> > >> > >
> > >> > > 1. Sounds good, just wanted to clarify; and it may worth
> documenting
> > >> it
> > >> > so
> > >> > > that users would not be surprised when monitoring their footprint.
> > >> > >
> > >> > > 2. Hmm I see... I think the trade-off can be described as "how
> much
> > >> > > imbalance would bother you to be willing to pay another rebalance,
> > >> along
> > >> > > with potentially more restoration lag", and the current definition
> > of
> > >> > > rebalance_factor can be considered as a rough measurement of that
> > >> > > imbalance. Of course one can argue that a finer grained
> measurement
> > >> could
> > >> > > be "resource footprint" like CPU / storage of each instance like
> we
> > >> have
> > >> > in
> > >> > > Kafka broker auto balancing tools, but I'd prefer not doing that
> as
> > >> part
> > >> > of
> > >> > > the library but more as an operational tool in the future. On the
> > >> other
> > >> > > hand, I've seen stateful and stateless tasks having very different
> > >> load,
> > >> > > and sometimes the only bottleneck of a Streams app is just one
> > >> stateful
> > >> > > sub-topology and whoever gets tasks of that sub-topology become
> > >> hotspot
> > >> > > (and that's why our algorithm tries to balance per sub-topology as
> > >> well),
> > >> > > so maybe we can just consider stateful tasks when calculating this
> > >> factor
> > >> > > as a very brute force heuristic?
> > >> > >
> > >> > > 3.a. Thinking about this a bit more, maybe it's better not try to
> > >> tackle
> > >> > an
> > >> > > unseen enemy just yet, and observe if it really emerges later, and
> > by
> > >> > then
> > >> > > we may have other ways to not starving the standby tasks, for
> > >> example, by
> > >> > > using dedicate threads for standby tasks or even consider having
> > >> higher
> > >> > > priority for standby than active so that we always try to caught
> up
> > >> > standby
> > >> > > first, then process active; and if active's lagging compared to
> > >> > > log-end-offset is increasing then we should increase capacity, etc
> > >> etc.
> > >> > >
> > >> > > 4. Actually with KIP-429 this may not be the case: we may not call
> > >> > > onPartitionsRevoked prior to rebalance any more so would not
> transit
> > >> > state
> > >> > > to PARTITIONS_REVOKED, and hence not cause the state of the
> instance
> > >> to
> > >> > be
> > >> > > REBALANCING. In other words, even if a instance is undergoing a
> > >> rebalance
> > >> > > it's state may still be RUNNING and it may still be processing
> > >> records at
> > >> > > the same time.
> > >> > >
> > >> > >
> > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler <j...@confluent.io>
> > >> wrote:
> > >> > >
> > >> > > > Hey Guozhang,
> > >> > > >
> > >> > > > Thanks for the review!
> > >> > > >
> > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we will still
> > >> > temporarily
> > >> > > > allocate standby tasks to accomplish a no-downtime task
> migration.
> > >> > > > Although, I'd argue that this doesn't really violate the config,
> > as
> > >> the
> > >> > > > task isn't a true hot standby. As soon as it catches up, we'll
> > >> > rebalance
> > >> > > > again, that task will become active, and the original instance
> > that
> > >> > hosted
> > >> > > > the active task will no longer have the task assigned at all.
> Once
> > >> the
> > >> > > > stateDirCleaner kicks in, we'll free the disk space from it, and
> > >> > return to
> > >> > > > the steady-state of having just one copy of the task in the
> > cluster.
> > >> > > >
> > >> > > > We can of course do without this, but I feel the current
> proposal
> > is
> > >> > > > operationally preferable, since it doesn't make configuring
> > >> > hot-standbys a
> > >> > > > pre-requisite for fast rebalances.
> > >> > > >
> > >> > > > 2. Yes, I think your interpretation is what we intended. The
> > default
> > >> > > > balance_factor would be 1, as it is implicitly today. What this
> > >> does is
> > >> > > > allows operators to trade off less balanced assignments against
> > >> fewer
> > >> > > > rebalances. If you have lots of space capacity in your
> instances,
> > >> this
> > >> > may
> > >> > > > be a perfectly fine tradeoff, and you may prefer for Streams not
> > to
> > >> > bother
> > >> > > > streaming GBs of data from the broker in pursuit of perfect
> > balance.
> > >> > Not
> > >> > > > married to this configuration, though. It was inspired by the
> > >> related
> > >> > work
> > >> > > > research we did.
> > >> > > >
> > >> > > > 3. I'll take a look
> > >> > > >
> > >> > > > 3a. I think this is a good idea. I'd classify it as a type of
> grey
> > >> > failure
> > >> > > > detection. It may make more sense to tackle grey failures as
> part
> > of
> > >> > the
> > >> > > > heartbeat protocol (as I POCed here:
> > >> > > > https://github.com/apache/kafka/pull/7096/files). WDYT?
> > >> > > >
> > >> > > > 4. Good catch! I didn't think about that before. Looking at it
> > now,
> > >> > though,
> > >> > > > I wonder if we're actually protected already. The
> stateDirCleaner
> > >> > thread
> > >> > > > only executes if the instance is in RUNNING state, and KIP-441
> > >> > proposes to
> > >> > > > use "probing rebalances" to report task lag. Hence, during the
> > >> window
> > >> > > > between when the instance reports a lag and the assignor makes a
> > >> > decision
> > >> > > > about it, the instance should remain in REBALANCING state,
> right?
> > If
> > >> > so,
> > >> > > > then this should prevent the race condition. If not, then we do
> > >> indeed
> > >> > need
> > >> > > > to do something about it.
> > >> > > >
> > >> > > > 5. Good idea. I think that today, you can only see the consumer
> > lag,
> > >> > which
> > >> > > > is a poor substitute. I'll add some metrics to the proposal.
> > >> > > >
> > >> > > > Thanks again for the comments!
> > >> > > > -John
> > >> > > >
> > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang <
> wangg...@gmail.com>
> > >> > wrote:
> > >> > > >
> > >> > > > > Hello Sophie,
> > >> > > > >
> > >> > > > > Thanks for the proposed KIP. I left some comments on the wiki
> > >> itself,
> > >> > > > and I
> > >> > > > > think I'm still not very clear on a couple or those:
> > >> > > > >
> > >> > > > > 1. With this proposal, does that mean with
> num.standby.replicas
> > ==
> > >> > 0, we
> > >> > > > > may sometimes still have some standby tasks which may violate
> > the
> > >> > config?
> > >> > > > >
> > >> > > > > 2. I think I understand the rationale to consider lags that is
> > >> below
> > >> > the
> > >> > > > > specified threshold to be equal, rather than still considering
> > >> 5000
> > >> > is
> > >> > > > > better than 5001 -- we do not want to "over-optimize" and
> > >> potentially
> > >> > > > falls
> > >> > > > > into endless rebalances back and forth.
> > >> > > > >
> > >> > > > > But I'm not clear about the rationale of the second parameter
> of
> > >> > > > > constrainedBalancedAssignment(StatefulTasksToRankedCandidates,
> > >> > > > > balance_factor):
> > >> > > > >
> > >> > > > > Does that mean, e.g. with balance_factor of 3, we'd consider
> two
> > >> > > > > assignments one resulting balance_factor 0 and one resulting
> > >> > > > balance_factor
> > >> > > > > 3 to be equally optimized assignment and therefore may "stop
> > >> early"?
> > >> > This
> > >> > > > > was not very convincing to me :P
> > >> > > > >
> > >> > > > > 3. There are a couple of minor comments about the algorithm
> > >> itself,
> > >> > left
> > >> > > > on
> > >> > > > > the wiki page since it needs to refer to the exact line and
> > better
> > >> > > > > displayed there.
> > >> > > > >
> > >> > > > > 3.a Another wild thought about the threshold itself: today the
> > >> > assignment
> > >> > > > > itself is memoryless, so we would not know if the reported
> > >> `TaskLag`
> > >> > > > itself
> > >> > > > > is increasing or decreasing even if the current value is under
> > the
> > >> > > > > threshold. I wonder if it worthy to make it a bit more
> > >> complicated to
> > >> > > > track
> > >> > > > > task lag trend at the assignor? Practically it may not be very
> > >> > uncommon
> > >> > > > > that stand-by tasks are not keeping up due to the fact that
> > other
> > >> > active
> > >> > > > > tasks hosted on the same thread is starving the standby tasks.
> > >> > > > >
> > >> > > > > 4. There's a potential race condition risk when reporting
> > >> `TaskLags`
> > >> > in
> > >> > > > the
> > >> > > > > subscription: right after reporting it to the leader, the
> > cleanup
> > >> > thread
> > >> > > > > kicks in and deletes the state directory. If the task was
> > assigned
> > >> > to the
> > >> > > > > host it would cause it to restore from beginning and
> effectively
> > >> > make the
> > >> > > > > seemingly optimized assignment very sub-optimal.
> > >> > > > >
> > >> > > > > To be on the safer side we should consider either prune out
> > those
> > >> > tasks
> > >> > > > > that are "close to be cleaned up" in the subscription, or we
> > >> should
> > >> > delay
> > >> > > > > the cleanup right after we've included them in the
> subscription
> > in
> > >> > case
> > >> > > > > they are been selected as assigned tasks by the assignor.
> > >> > > > >
> > >> > > > > 5. This is a meta comment: I think it would be helpful to add
> > some
> > >> > user
> > >> > > > > visibility on the standby tasks lagging as well, via metrics
> for
> > >> > example.
> > >> > > > > Today it is hard for us to observe how far are our current
> > standby
> > >> > tasks
> > >> > > > > compared to the active tasks and whether that lag is being
> > >> > increasing or
> > >> > > > > decreasing. As a follow-up task, for example, the rebalance
> > should
> > >> > also
> > >> > > > be
> > >> > > > > triggered if we realize that some standby task's lag is
> > increasing
> > >> > > > > indefinitely means that it cannot keep up (which is another
> > >> indicator
> > >> > > > > either you need to add more resources with the num.standbys or
> > >> your
> > >> > are
> > >> > > > > still not balanced enough).
> > >> > > > >
> > >> > > > >
> > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie Blee-Goldman <
> > >> > sop...@confluent.io>
> > >> > > > > wrote:
> > >> > > > >
> > >> > > > > > Hey all,
> > >> > > > > >
> > >> > > > > > I'd like to kick off discussion on KIP-441, aimed at the
> long
> > >> > restore
> > >> > > > > times
> > >> > > > > > in Streams during which further active processing and IQ are
> > >> > blocked.
> > >> > > > > > Please give it a read and let us know your thoughts
> > >> > > > > >
> > >> > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > >> > > > > >
> > >> > > > > > Cheers,
> > >> > > > > > Sophie
> > >> > > > > >
> > >> > > > >
> > >> > > > >
> > >> > > > > --
> > >> > > > > -- Guozhang
> > >> > > > >
> > >> > > >
> > >> > >
> > >> > >
> > >> > > --
> > >> > > -- Guozhang
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to