Hi John, Thanks for the added section, I agree with your reasoning and I think we can still use the standby replicas now.
Guozhang On Tue, Aug 20, 2019 at 3:13 PM John Roesler <j...@confluent.io> wrote: > Hi All, > > Thanks for the discussion. I've been considering the idea of giving the > "catching up" tasks a different name/role. I was in favor initially, but > after working though some details, I think it causes some problems, which > I've written up in the "rejected alternatives" part of the KIP: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP-441:SmoothScalingOutforKafkaStreams-Addinganewkindoftask(%22moving%22,%22recovering%22,%22learner%22)fortaskmovements > > Please give it a read and let me know what you think. > > Thanks, > -John > > On Thu, Aug 8, 2019 at 7:57 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > I think I agree with you Sophie. My gut feeling is that 1) it should not > be > > the major concern in assignor's algorithm for standby tasks not catching > > up, but rather be tackled in different modules, and 2) a lot of > > optimization can be down at the stream thread itself, like dedicated > > threading and larger batching, or even complicated scheduling mechanisms > > between running, restoring and standby tasks. In anyways, I think we can > > take this out of the scope of KIP-441 for now. > > > > > > Guozhang > > > > > > On Thu, Aug 8, 2019 at 5:48 PM Sophie Blee-Goldman <sop...@confluent.io> > > wrote: > > > > > > we may have other ways to not starving the standby tasks, for > example, > > by > > > > using dedicate threads for standby tasks or even consider having > > > *higher> priority for standby than active* so that we always try to > > caught > > > up standby > > > > first, then process active > > > > > > This is an interesting idea, but seems likely to get in the way of the > > > original idea of this KIP > > > -- if we always process standby tasks first, then if we are assigned a > > new > > > standby task we > > > will have to wait for it to catch up completely before processing any > > > active tasks! That's > > > even worse than the situation this KIP is trying to help with, since a > > new > > > standby task has to > > > restore from 0 (whereas an active task at least can take over from > > wherever > > > the standby was). > > > > > > During restoration -- while there exist any restoring tasks -- I think > > it's > > > reasonable to de-prioritize the > > > standby tasks and just process restoring and active tasks so both can > > make > > > progress. But we should > > > let them catch up afterwards somehow -- maybe we can apply some kind of > > > heuristic, like "if we haven't > > > processed standbys for X iterations, or Y milliseconds, do so now." > > > > > > Actually, it might even be beneficial to avoid processing standbys a > > record > > > or two at a time and instead > > > wait for a large enough batch to build up for the RocksDB bulk-loading > > > benefits. > > > > > > I think the "use dedicated threads for standby" is the more promising > end > > > goal, especially since > > > if we split restoration into "restoring tasks" then active and standbys > > > share almost nothing. But > > > that seems like follow-up work to the current KIP :) > > > > > > On Thu, Aug 8, 2019 at 5:31 PM Sophie Blee-Goldman < > sop...@confluent.io> > > > wrote: > > > > > > > Stateful tasks with logging disabled seem to be an interesting edge > > case. > > > > On the one hand, > > > > for balancing purposes they should be considered stateful since as > > > > Guozhang pointed out > > > > they are still "heavy" in IO costs. But for "catching up" purposes, > ie > > > > when allocating standby > > > > tasks that will become active tasks, they should be considered > > stateless > > > > as there is so > > > > meaningful sense of their lag. We should never allocate standby tasks > > for > > > > them during the > > > > first rebalance, but should ensure they are evenly distributed across > > > > instances. Maybe we > > > > should split these into a third category -- after we assign all > > stateful > > > > tasks with logging, we > > > > then distribute the set of logging-disabled stateful tasks to improve > > > > balance, before lastly > > > > distributing stateless tasks? > > > > > > > > This actually leads into what I was just thinking, which is that we > > > really > > > > should distinguish the > > > > "catch-up" standbys from normal standbys as well as distinguishing > > > > actively processing tasks > > > > from active tasks that are still in the restore phase. It's somewhat > > > > awkward that today, some > > > > active tasks just start processing immediately while others behave > more > > > > like standby than active > > > > tasks for some time, before switching to real active. They first use > > the > > > > restoreConsumer, then > > > > later only the "normal" consumer. > > > > > > > > However, this restore period is still distinct from normal standbys > in > > a > > > > lot of ways -- the code path > > > > for restoring is different than for updating standbys, for example in > > how > > > > long we block on #poll. > > > > So in addition to giving them their own name -- let's go with > restoring > > > > task for now -- they really > > > > do seem to deserve being their own distinct task. We can optimize > them > > > for > > > > efficient conversion > > > > to active tasks since we know that's what they will be. > > > > > > > > This resolves some of the awkwardness of dealing with the special > case > > > > mentioned above: we > > > > find a balanced assignment of stateful and stateless tasks, and > create > > > > restoring tasks as needed. > > > > If logging is disabled, no restoring task is created. > > > > > > > > > > > > On Thu, Aug 8, 2019 at 3:44 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > >> Regarding 3) above: I think for active task they should still be > > > >> considered > > > >> stateful since the processor would still pay IO cost accessing the > > > store, > > > >> but they would not have standby tasks? > > > >> > > > >> On Thu, Aug 8, 2019 at 7:49 AM Bruno Cadonna <br...@confluent.io> > > > wrote: > > > >> > > > >> > Hi, > > > >> > > > > >> > Thank you for the KIP! > > > >> > > > > >> > Some questions/comments: > > > >> > > > > >> > 1. I am wondering if the "stand-by" tasks that catch up state > before > > > >> > the active task is switched deserve its own name in this KIP and > > maybe > > > >> > in the code. We have already stated that they are not true > stand-by > > > >> > tasks, they are not configured through `num.standby.replicas`, and > > > >> > maybe they have also other properties that distinguish them from > > true > > > >> > stand-by tasks of which we are not aware yet. For example, they > may > > be > > > >> > prioritized differently than other tasks. Furthermore, the name > > > >> > "stand-by" does not really fit with the planned functionality of > > those > > > >> > tasks. In the following, I will call them false stand-by tasks. > > > >> > > > > >> > 2. Did you consider to trigger the probing rebalances not at > regular > > > >> > time intervals but when the false stand-by tasks reach an > acceptable > > > >> > lag? If you did consider, could you add a paragraph why you > rejected > > > >> > this idea to the "Rejected Alternatives" section. > > > >> > > > > >> > 3. Are tasks that solely contain stores with disabled logging > > > >> > classified as stateful or stateless in the algorithm? I would > guess > > > >> > stateless, although if possible they should be assigned to the > same > > > >> > instance they had run before the rebalance. As far as I can see > this > > > >> > special case is not handled in the algorithm. > > > >> > > > > >> > Best, > > > >> > Bruno > > > >> > > > > >> > > > > >> > > > > >> > On Thu, Aug 8, 2019 at 8:24 AM Guozhang Wang <wangg...@gmail.com> > > > >> wrote: > > > >> > > > > > >> > > 1. Sounds good, just wanted to clarify; and it may worth > > documenting > > > >> it > > > >> > so > > > >> > > that users would not be surprised when monitoring their > footprint. > > > >> > > > > > >> > > 2. Hmm I see... I think the trade-off can be described as "how > > much > > > >> > > imbalance would bother you to be willing to pay another > rebalance, > > > >> along > > > >> > > with potentially more restoration lag", and the current > definition > > > of > > > >> > > rebalance_factor can be considered as a rough measurement of > that > > > >> > > imbalance. Of course one can argue that a finer grained > > measurement > > > >> could > > > >> > > be "resource footprint" like CPU / storage of each instance like > > we > > > >> have > > > >> > in > > > >> > > Kafka broker auto balancing tools, but I'd prefer not doing that > > as > > > >> part > > > >> > of > > > >> > > the library but more as an operational tool in the future. On > the > > > >> other > > > >> > > hand, I've seen stateful and stateless tasks having very > different > > > >> load, > > > >> > > and sometimes the only bottleneck of a Streams app is just one > > > >> stateful > > > >> > > sub-topology and whoever gets tasks of that sub-topology become > > > >> hotspot > > > >> > > (and that's why our algorithm tries to balance per sub-topology > as > > > >> well), > > > >> > > so maybe we can just consider stateful tasks when calculating > this > > > >> factor > > > >> > > as a very brute force heuristic? > > > >> > > > > > >> > > 3.a. Thinking about this a bit more, maybe it's better not try > to > > > >> tackle > > > >> > an > > > >> > > unseen enemy just yet, and observe if it really emerges later, > and > > > by > > > >> > then > > > >> > > we may have other ways to not starving the standby tasks, for > > > >> example, by > > > >> > > using dedicate threads for standby tasks or even consider having > > > >> higher > > > >> > > priority for standby than active so that we always try to caught > > up > > > >> > standby > > > >> > > first, then process active; and if active's lagging compared to > > > >> > > log-end-offset is increasing then we should increase capacity, > etc > > > >> etc. > > > >> > > > > > >> > > 4. Actually with KIP-429 this may not be the case: we may not > call > > > >> > > onPartitionsRevoked prior to rebalance any more so would not > > transit > > > >> > state > > > >> > > to PARTITIONS_REVOKED, and hence not cause the state of the > > instance > > > >> to > > > >> > be > > > >> > > REBALANCING. In other words, even if a instance is undergoing a > > > >> rebalance > > > >> > > it's state may still be RUNNING and it may still be processing > > > >> records at > > > >> > > the same time. > > > >> > > > > > >> > > > > > >> > > On Wed, Aug 7, 2019 at 12:14 PM John Roesler <j...@confluent.io > > > > > >> wrote: > > > >> > > > > > >> > > > Hey Guozhang, > > > >> > > > > > > >> > > > Thanks for the review! > > > >> > > > > > > >> > > > 1. Yes, even with `num.standby.replicas := 0`, we will still > > > >> > temporarily > > > >> > > > allocate standby tasks to accomplish a no-downtime task > > migration. > > > >> > > > Although, I'd argue that this doesn't really violate the > config, > > > as > > > >> the > > > >> > > > task isn't a true hot standby. As soon as it catches up, we'll > > > >> > rebalance > > > >> > > > again, that task will become active, and the original instance > > > that > > > >> > hosted > > > >> > > > the active task will no longer have the task assigned at all. > > Once > > > >> the > > > >> > > > stateDirCleaner kicks in, we'll free the disk space from it, > and > > > >> > return to > > > >> > > > the steady-state of having just one copy of the task in the > > > cluster. > > > >> > > > > > > >> > > > We can of course do without this, but I feel the current > > proposal > > > is > > > >> > > > operationally preferable, since it doesn't make configuring > > > >> > hot-standbys a > > > >> > > > pre-requisite for fast rebalances. > > > >> > > > > > > >> > > > 2. Yes, I think your interpretation is what we intended. The > > > default > > > >> > > > balance_factor would be 1, as it is implicitly today. What > this > > > >> does is > > > >> > > > allows operators to trade off less balanced assignments > against > > > >> fewer > > > >> > > > rebalances. If you have lots of space capacity in your > > instances, > > > >> this > > > >> > may > > > >> > > > be a perfectly fine tradeoff, and you may prefer for Streams > not > > > to > > > >> > bother > > > >> > > > streaming GBs of data from the broker in pursuit of perfect > > > balance. > > > >> > Not > > > >> > > > married to this configuration, though. It was inspired by the > > > >> related > > > >> > work > > > >> > > > research we did. > > > >> > > > > > > >> > > > 3. I'll take a look > > > >> > > > > > > >> > > > 3a. I think this is a good idea. I'd classify it as a type of > > grey > > > >> > failure > > > >> > > > detection. It may make more sense to tackle grey failures as > > part > > > of > > > >> > the > > > >> > > > heartbeat protocol (as I POCed here: > > > >> > > > https://github.com/apache/kafka/pull/7096/files). WDYT? > > > >> > > > > > > >> > > > 4. Good catch! I didn't think about that before. Looking at it > > > now, > > > >> > though, > > > >> > > > I wonder if we're actually protected already. The > > stateDirCleaner > > > >> > thread > > > >> > > > only executes if the instance is in RUNNING state, and KIP-441 > > > >> > proposes to > > > >> > > > use "probing rebalances" to report task lag. Hence, during the > > > >> window > > > >> > > > between when the instance reports a lag and the assignor > makes a > > > >> > decision > > > >> > > > about it, the instance should remain in REBALANCING state, > > right? > > > If > > > >> > so, > > > >> > > > then this should prevent the race condition. If not, then we > do > > > >> indeed > > > >> > need > > > >> > > > to do something about it. > > > >> > > > > > > >> > > > 5. Good idea. I think that today, you can only see the > consumer > > > lag, > > > >> > which > > > >> > > > is a poor substitute. I'll add some metrics to the proposal. > > > >> > > > > > > >> > > > Thanks again for the comments! > > > >> > > > -John > > > >> > > > > > > >> > > > On Tue, Aug 6, 2019 at 4:27 PM Guozhang Wang < > > wangg...@gmail.com> > > > >> > wrote: > > > >> > > > > > > >> > > > > Hello Sophie, > > > >> > > > > > > > >> > > > > Thanks for the proposed KIP. I left some comments on the > wiki > > > >> itself, > > > >> > > > and I > > > >> > > > > think I'm still not very clear on a couple or those: > > > >> > > > > > > > >> > > > > 1. With this proposal, does that mean with > > num.standby.replicas > > > == > > > >> > 0, we > > > >> > > > > may sometimes still have some standby tasks which may > violate > > > the > > > >> > config? > > > >> > > > > > > > >> > > > > 2. I think I understand the rationale to consider lags that > is > > > >> below > > > >> > the > > > >> > > > > specified threshold to be equal, rather than still > considering > > > >> 5000 > > > >> > is > > > >> > > > > better than 5001 -- we do not want to "over-optimize" and > > > >> potentially > > > >> > > > falls > > > >> > > > > into endless rebalances back and forth. > > > >> > > > > > > > >> > > > > But I'm not clear about the rationale of the second > parameter > > of > > > >> > > > > > constrainedBalancedAssignment(StatefulTasksToRankedCandidates, > > > >> > > > > balance_factor): > > > >> > > > > > > > >> > > > > Does that mean, e.g. with balance_factor of 3, we'd consider > > two > > > >> > > > > assignments one resulting balance_factor 0 and one resulting > > > >> > > > balance_factor > > > >> > > > > 3 to be equally optimized assignment and therefore may "stop > > > >> early"? > > > >> > This > > > >> > > > > was not very convincing to me :P > > > >> > > > > > > > >> > > > > 3. There are a couple of minor comments about the algorithm > > > >> itself, > > > >> > left > > > >> > > > on > > > >> > > > > the wiki page since it needs to refer to the exact line and > > > better > > > >> > > > > displayed there. > > > >> > > > > > > > >> > > > > 3.a Another wild thought about the threshold itself: today > the > > > >> > assignment > > > >> > > > > itself is memoryless, so we would not know if the reported > > > >> `TaskLag` > > > >> > > > itself > > > >> > > > > is increasing or decreasing even if the current value is > under > > > the > > > >> > > > > threshold. I wonder if it worthy to make it a bit more > > > >> complicated to > > > >> > > > track > > > >> > > > > task lag trend at the assignor? Practically it may not be > very > > > >> > uncommon > > > >> > > > > that stand-by tasks are not keeping up due to the fact that > > > other > > > >> > active > > > >> > > > > tasks hosted on the same thread is starving the standby > tasks. > > > >> > > > > > > > >> > > > > 4. There's a potential race condition risk when reporting > > > >> `TaskLags` > > > >> > in > > > >> > > > the > > > >> > > > > subscription: right after reporting it to the leader, the > > > cleanup > > > >> > thread > > > >> > > > > kicks in and deletes the state directory. If the task was > > > assigned > > > >> > to the > > > >> > > > > host it would cause it to restore from beginning and > > effectively > > > >> > make the > > > >> > > > > seemingly optimized assignment very sub-optimal. > > > >> > > > > > > > >> > > > > To be on the safer side we should consider either prune out > > > those > > > >> > tasks > > > >> > > > > that are "close to be cleaned up" in the subscription, or we > > > >> should > > > >> > delay > > > >> > > > > the cleanup right after we've included them in the > > subscription > > > in > > > >> > case > > > >> > > > > they are been selected as assigned tasks by the assignor. > > > >> > > > > > > > >> > > > > 5. This is a meta comment: I think it would be helpful to > add > > > some > > > >> > user > > > >> > > > > visibility on the standby tasks lagging as well, via metrics > > for > > > >> > example. > > > >> > > > > Today it is hard for us to observe how far are our current > > > standby > > > >> > tasks > > > >> > > > > compared to the active tasks and whether that lag is being > > > >> > increasing or > > > >> > > > > decreasing. As a follow-up task, for example, the rebalance > > > should > > > >> > also > > > >> > > > be > > > >> > > > > triggered if we realize that some standby task's lag is > > > increasing > > > >> > > > > indefinitely means that it cannot keep up (which is another > > > >> indicator > > > >> > > > > either you need to add more resources with the num.standbys > or > > > >> your > > > >> > are > > > >> > > > > still not balanced enough). > > > >> > > > > > > > >> > > > > > > > >> > > > > On Tue, Aug 6, 2019 at 1:32 PM Sophie Blee-Goldman < > > > >> > sop...@confluent.io> > > > >> > > > > wrote: > > > >> > > > > > > > >> > > > > > Hey all, > > > >> > > > > > > > > >> > > > > > I'd like to kick off discussion on KIP-441, aimed at the > > long > > > >> > restore > > > >> > > > > times > > > >> > > > > > in Streams during which further active processing and IQ > are > > > >> > blocked. > > > >> > > > > > Please give it a read and let us know your thoughts > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams > > > >> > > > > > > > > >> > > > > > Cheers, > > > >> > > > > > Sophie > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > -- > > > >> > > > > -- Guozhang > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > -- > > > >> > > -- Guozhang > > > >> > > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang