1) you are right, a thread's restoration phase will not interfere will any other threads' normal processing collocated within the same JVM / machine etc at all. So you may have a Streams instance which contains some threads already finished restoring and started processing tasks, while other threads contained are still restoring.
Guozhang On Mon, Feb 25, 2019 at 1:53 PM Adam Bellemare <adam.bellem...@gmail.com> wrote: > Hi Guozhang - > > Thanks for the replies, and directing me to the existing JIRAs. I think > that a two-phase rebalance will be quite useful. > > 1) For clarity's sake, I should have just asked: When a new thread / node > is created and tasks are rebalanced, are the state stores on the new > threads/nodes fully restored during rebalancing, thereby blocking *any and > all *threads from proceeding with processing until restoration is complete? > I do not believe that this is the case, and in the case of rebalanced tasks > only the threads assigned the new tasks will be paused until state store > restoration is complete. > > > Thanks for your help - I appreciate you taking the time to reply. > > Adam > > > > On Wed, Feb 20, 2019 at 8:38 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Adam, > > > > Sorry for being late replying on this thread, I've put my comments > inlined > > below. > > > > On Sun, Feb 3, 2019 at 7:34 AM Adam Bellemare <adam.bellem...@gmail.com> > > wrote: > > > > > Hey Folks > > > > > > I have a few questions around the operations of stateful processing > while > > > scaling nodes up/down, and a possible KIP in question #4. Most of them > > have > > > to do with task processing during rebuilding of state stores after > > scaling > > > nodes up. > > > > > > Scenario: > > > Single node/thread, processing 2 topics (10 partitions each): > > > User event topic (events) - ie: key:userId, value: ProductId > > > Product topic (entity) - ie: key: ProductId, value: productData > > > > > > My topology looks like this: > > > > > > KTable productTable = ... //materialize from product topic > > > > > > KStream output = userStream > > > .map(x => (x.value, x.key) ) //Swap the key and value around > > > .join(productTable, ... ) //Joiner is not relevant here > > > .to(...) //Send it to some output topic > > > > > > > > > Here are my questions: > > > 1) If I scale the processing node count up, partitions will be > rebalanced > > > to the new node. Does processing continue as normal on the original > node, > > > while the new node's processing is paused as the internal state stores > > are > > > rebuilt/reloaded? From my reading of the code (and own experience) I > > > believe this to be the case, but I am just curious in case I missed > > > something. > > > > > > > > With 2 topics and 10 partitions each, assuming the default > PartitionGrouper > > is used, there should be a total of 20 tasks (10 tasks for map which will > > send to an internal repartition topic, and 10 tasks for doing the join) > > created since these two topics are co-partitioned for joins. > > > > For example, task-0 would be processing the join from > > user-topic-partition-0 and product-topic-partition-0, and so on. > > > > With a single thread, all of these 20 tasks will be allocated to this > > thread, which would process them in an iterative manner. Note that since > > each task has its own state store (e.g. product-state-store-0 for task-0, > > etc), it means this thread will host all the 10 sets of state stores as > > well (note for the 10 mapping tasks there's no state stores at all). > > > > When you add new threads either within the same node, or on a different > > node, after rebalance each thread should be processing 10 tasks, and > hence > > owning corresponding set of state stores due to rebalance. The new thread > > will first restore the state stores it gets assigned before start > > processing. > > > > > > > 2) What happens to the userStream map task? Will the new node be able > to > > > process this task while the state store is rebuilding/reloading? My > > reading > > > of the code suggests that this map process will be paused on the new > node > > > while the state store is rebuilt. The effect of this is that it will > lead > > > to a delay in events reaching the original node's partitions, which > will > > be > > > seen as late-arriving events. Am I right in this assessment? > > > > > > > > Currently the thread will NOT start processing any tasks until ALL > stateful > > tasks completes restoring (stateless tasks, like the map tasks in your > > example never needs restoration at all). There's an open JIRA for making > it > > customizable but I cannot find it currently. > > > > > > > 3) How does scaling up work with standby state-store replicas? From my > > > reading of the code, it appears that scaling a node up will result in a > > > reabalance, with the state assigned to the new node being rebuilt first > > > (leading to a pause in processing). Following this, the standy replicas > > are > > > populated. Am I correct in this reading? > > > > > > Standby tasks are running in parallel with active stream tasks, and it > > simply reads from the changelog topic in read time and populate the > standby > > store replica; when scaling out, the instances with standby tasks will be > > preferred over those who do not have any standby for the task, and hence > > when restoring only a very small amount of data needs to be restored > > (think: the standby replica of the store is already populated up to > offset > > 90 at the rebalance, while the active task is writing to the changelog > > topic with log end offset 100, so you only need to restore 90 - 100 > instead > > of 0 - 100). > > > > > > > 4) If my reading in #3 is correct, would it be possible to pre-populate > > the > > > standby stores on scale-up before initiating active-task transfer? This > > > would allow seamless scale-up and scale-down without requiring any > pauses > > > for rebuilding state. I am interested in kicking this off as a KIP if > so, > > > but would appreciate any JIRAs or related KIPs to read up on prior to > > > digging into this. > > > > > > Yes, there is some discussions about this here: > > https://issues.apache.org/jira/browse/KAFKA-6145 > > > > > > > > > > Thanks > > > > > > Adam Bellemare > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang