+1 on my end. Looks good to me. Thanks for putting this together, Manasa!
Cheers, Bharath On Mon, Feb 6, 2023 at 11:51 PM Jagadish Venkatraman <jagadish1...@gmail.com> wrote: > Thank you Manasa for the proposal. I reviewed it and it looks good to me. > nice work! > > +1 (approve) from my end. > > > > On Mon, Feb 6, 2023 at 11:41 PM Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Manasa, > > > > Sorry for the late reply. The revision lgtm. Thanks for the great work! > > > > Best, > > > > -Yi > > > > On Mon, Jan 30, 2023 at 12:11 PM Lakshmi Manasa < > lakshmimanas...@gmail.com > > > > > wrote: > > > > > Hi Yi, > > > > > > I have updated the SEP-32 including all feedback for the above > > questions. > > > Please let me know if there are any follow up questions. > > > > > > thanks, > > > Manasa > > > > > > On Mon, Jan 23, 2023 at 8:56 AM Lakshmi Manasa < > > lakshmimanas...@gmail.com> > > > wrote: > > > > > >> Hi Yi, > > >> > > >> thank you for raising these questions. Please find my answers inline > > >> below. > > >> > > >> *a) how are states for the virtual tasks managed during split/merge?* > > >> for this SEP, stateful job elasticity is future work. SEP-32 currently > > >> only deals with stateless elasticity > > >> The idea for state preserving elasticity is to have a requirement that > > >> only jobs can guarantee a bijective mapping between state key and > input > > key > > >> will be supported. > > >> This requirement is needed so that when input keys move from one > virtual > > >> task to another, it is easy to identify which state keys should be > > present > > >> in the store of the virtual task for correct operation. > > >> additionally, stateful elasticity is only supported for jobs that rely > > on > > >> blob store for backup and restore. > > >> Furthermore, for stateful jobs elasticity is increased or decreased > only > > >> in steps of 2. > > >> With these restrictions in place, when a job starts with elasticity > > >> factor 2, the state blob for the original task is copied for both > > virtual > > >> tasks during a split. > > >> for a merge, when two virtual tasks merge into one (virtual/original) > > >> task, the state blob of new task will need to be stitched from older > > blobs. > > >> This will need to be done by leveraging the stateKey input key > bijective > > >> mapping which will help determing for each state key in new blob, the > > value > > >> should come from which older blob > > >> (older blob belonged to a virtual task that consumed an input key > based > > >> on the keyBucket of the virutal task) > > >> That said the design for stateful needs more work and is planned for a > > >> subsequent follow up SEP and this current SEP-32, focusses only on > > >> stateless jobs > > >> > > >> *b) what's perf impact when we have 2 virtual tasks on the same SSP in > > >> the same container, while one virtual task is much faster than the > > other?* > > >> SystemConsumer subscribes to the input system at a partition level. > > >> Due to this even if one v. task is much faster than the other, since > > both > > >> consume the same SSP, system consumer of a container will only fetch > > only > > >> once the entire SSP buffer is empty. > > >> This means even though one v. task is much faster, the perf will be > > >> determined by the slower v. task. > > >> however, this is not worse than the pre-elastic job perf and if num > > >> containers is increased then the fast v.task can improve perf if the > > slow > > >> and fast v.task are in different containers (different system > consumers) > > >> > > >> *c) what's the reason that a virtual task can not filter older > messages > > >> from a previous offset, in case the container restarts from a smaller > > >> offset from another virtual task consuming the same SSP?* > > >> iiuc this question is for when a containers has two v. tasks that > > >> committed checkpoints for an SSP where one fast v.task commited a > newer > > >> offset and slow v.task committed an older offset. > > >> In this scenario, the SEP says there could be duplicate processing as > > the > > >> SystemConsumer will start consuming from the older offset for the SSP. > > >> Yes, an improvement can be done to enable the v.task that committed a > > >> newer offset to start processing only from the offset after its > > checkpoint > > >> and filter out older messages. > > >> > > >> *d) how do we compare this w/ an alternative idea that implements a > > >> KeyedOrderedExecutor w/ multiple parallel threads within the single > > task's > > >> main event loop to increase the parallelism?* > > >> Is this similar to the per-key parallelism option (in the rejected > > >> solutions section) with the difference that the num threads is fixed > > for a > > >> single task (as opposed to one thread per key in the rejected > solution)? > > >> this KeyOrdereredExecutor is better than the parallelism current > > >> task.max.concurrency offers as it gives in-order execution per key. > > >> However, for KeyOrderedExecutor solution num containers will still be > <= > > >> num tasks. > > >> this means (a) to increase throughput for a key, all other keys should > > >> also be processed faster (this is partially present in elasticity as > > seen > > >> in question above, but with increased elasticity factor and more > > containers > > >> this can be combated), (b) network, disk, i/o contention will be > larger > > >> than elasticity as virtual tasks can be spread across hosts whereas > > >> increased throughput due to all keys (single task) in key ordered > > executor > > >> sitting in the same host will increase the load on the host and (c) if > > one > > >> or more of the parallel units (threads here) needs more resources, it > > will > > >> result in large container which makes scheduling harder as finding > large > > >> chunks takes longer in a cluster whereas with virtual tasks, we can > have > > >> smaller containers for virtual tasks. > > >> > > >> > > >> Please let me know if the above answers make sense and if there are > any > > >> follow-ups for this SEP. > > >> > > >> On Thu, Jan 19, 2023 at 10:33 PM Yi Pan <nickpa...@gmail.com> wrote: > > >> > > >>> Hey, Manasa, > > >>> > > >>> Sorry to chime in late. A few questions: > > >>> a) how are states for the virtual tasks managed during split/merge? > > >>> b) what's perf impact when we have 2 virtual tasks on the same SSP in > > the > > >>> same container, while one virtual task is much faster than the other? > > >>> c) what's the reason that a virtual task can not filter older > messages > > >>> from > > >>> a previous offset, in case the container restarts from a smaller > offset > > >>> from another virtual task consuming the same SSP? > > >>> d) how do we compare this w/ an alternative idea that implements a > > >>> KeyedOrderedExecutor w/ multiple parallel threads within the single > > >>> task's > > >>> main event loop to increase the parallelism? > > >>> > > >>> Best, > > >>> > > >>> -Yi > > >>> > > >>> > > >>> On Thu, Jan 19, 2023 at 3:26 PM Lakshmi Manasa < > > >>> lakshmimanas...@gmail.com> > > >>> wrote: > > >>> > > >>> > hi all, > > >>> > > > >>> > if there are no concerns or questions about this SEP, I shall > start > > >>> the > > >>> > vote email thread tomorrow. > > >>> > > > >>> > thanks, > > >>> > Manasa > > >>> > > > >>> > On Fri, Jan 6, 2023 at 8:08 AM Lakshmi Manasa < > > >>> lakshmimanas...@gmail.com> > > >>> > wrote: > > >>> > > > >>> > > Hi all, > > >>> > > We created SEP-32: Elasticity for Samza. > > >>> > > > > >>> > > Please find SEP here ( > > >>> > > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/SAMZA/SEP-32%3A+Elasticity+for+Samza > > >>> > > ) > > >>> > > Please take a look and provide feedback. thanks, Manasa > > >>> > > > > >>> > > > >>> > > >> > > > > > -- > -- Jagadish >