@Dong, thanks for the updates. +1
On Thu, Jun 22, 2017 at 3:36 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Yi, > > Thanks for the detailed comment and the summary! > > To address your comments: > > 1) The current names are GroupByPartitionWithFixedTaskNum and > GroupBySystemStreamPartitionWithFixedTaskNum. Instead of > FixedTasksGroupByPartition > and FixedTasksGroupBySystemStreamPartition, how about GroupByPartition > FixedTasks and GroupBySystemStreamPartitionFixedTasks? The new names are > equally long as the names you suggested. It seems a bit more intuitive > because they would be prefixed with the grouper class name of their > no-fixed-tasks counterpart. I have updated wiki with the new names. Can you > let me know if it is OK? > > 2) Initially I want to design that config and interface later when we have > more use-case so that we can have higher confidence in the interface > design. But it seems that one common concern with the proposal is about its > limitation assumption in the the old-partition-to-new-partition mapping. I > have updated the wiki to illustrate the design of this interface and the > new (and more general) assumption for the input system to use this > partition expansion. Can you take a look and see if it is reasonable? > > 3) Yeah previously Jacob has raised the same concern and the solution is > exactly the same as you suggested. > > Hey everyone, > > I have made non-trivial change to the wiki to illustrate the use of new > config and interface for user to specify new-partition-to-old-partition > mapping. Can you please help review it? > > Thanks, > Dong > > > On Thu, Jun 22, 2017 at 2:25 AM, Yi Pan <nickpa...@gmail.com> wrote: > > > Hi, Dong and everyone, > > > > Thanks for the detailed discussion on SEP-5! Really appreciate the > thorough > > consideration on this issue. I also noticed that Dong has updated the > SEP-5 > > wiki to clarify: > > 1) SEP-5 provides a solution to retain the same number of task/state w/o > > re-partitioning (as illustrated in the stateful join example) > > 2) Future work to expand number of tasks need to work together with > > flexible re-partitioning to provide a complete solution > > > > Due to the cost to be paid in task number expansion: > > 1) additional network I/O and latency in re-partitioning > > 2) shuffling of the states among tasks > > The current form of SEP-5 provides an alternative when partition > expansion > > in the messaging system is not due to increase of total input rate. > > > > The concern on the added complexity in grouper logic is valid. However, > the > > grouper-based solution is not completely unreasonable: > > 1) Grouper is a public interface and we are already open to customized > > implementation of groupers, although not being a main use case > > 2) Deprecation of existing config-driven grouper needs longer time effort > > to wait for fluent API has a better planner to automatically figuring out > > the grouper to be used and stateful task expansion is automated. Hence, > for > > a foreseeable long time, grouper is still configured by the user. > > > > So, in general, I am in favor of the proposed SEP-5, given that it > provides > > a least-resistance to address some pain points for Samza users, w/o > > breaking any existing use cases in opt-in mode. > > > > Some minor suggestions: > > 1) The class names are too long. Can we change them to > > FixedTasksGroupByPartition and FixedTasksGroupBySystemStreamPartition? > > 2) I am still in favor of configurable partition expansion (i.e. > new<->old > > partition mapping) policy, since it makes this solution more general and > > not fixed for Kafka. I am OK with default to power-of-2 expansion policy > > and not introducing new config variable now. > > 3) In the checkpoint/coordinator topic validation, KafkaCheckpointLogKey > > class validates the current grouper factory class == the previous grouper > > factory class in previous checkpoint. We need to make sure that we allow > > the compatible change from GroupByPartition to > FixedTasksGroupByPartition, > > etc. Since FixedTasksGroupByPartition is a derived interface of > > GroupByPartition, one possible solution is to check assignable (if > current > > grouper factory class is assignable to the previous grouper factory > class) > > > > Thanks a lot! > > > > On Wed, Jun 21, 2017 at 5:11 PM, Navina Ramesh (Apache) < > nav...@apache.org > > > > > wrote: > > > > > > But IMO it is the best available solution towards the support of > > > partition expansion in comparison to alternative, no? > > > > > > At this time, relative to the other alternatives you have listed, this > > is a > > > path of least effort to solving this problem. I agree to that. :) > > > > > > > I can merge those two sections or update the statement if the current > > > statement > > > has not clearly explained the reason of partition expansion in Kafka. > > > > > > Given the significance of what you are actually trying to solve, I > think > > it > > > will be better to have it in points. Let me come find you and we can > > update > > > it. > > > > > > > I have updated wiki and added the task expansion to the Future Work > > > section. > > > On the other hand I still keep it in the Rejected Alternative section > to > > > explain why this future work does not replace the existing proposal in > > > SEP-5. Does this sound reasonable? > > > > > > It is very confusing to me how the same point can be under "Future > Work" > > > and "Rejected Alternative". There is no question about the future work > > > *replacing* SEP-5. Iiuc, this SEP is a subset for the partition > expansion > > > solution. So, I don't think increasing task count should be a rejected > > > alternative. > > > > > > > I am also not sure why a feature needs to be "utmost priority" in > order > > > to be accepted. Can you explain a bit on that? > > > > > > I don't think I ever claimed that the feature needs to be of "utmost > > > priority" to be accepted. I was just stating my opinion. > > > > > > > > > Thanks! > > > Navina > > > > > > On Wed, Jun 21, 2017 at 3:52 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Thanks much for the reply Navina. Please see my reply inline. > > > > > > > > On Wed, Jun 21, 2017 at 2:57 PM, Navina Ramesh (Apache) < > > > nav...@apache.org > > > > > > > > > wrote: > > > > > > > > > Thanks to Jake, Dong and Kartik for keeping the discussion going. > > > > > > > > > > > Here are the pros and cons of the extra re-partitioning stage in > > > > > comparison > > > > > to SEP-5. > > > > > > > > > > I think that is good summarization of pros/cons for the > > repartitioning > > > > > stage based solution. Can you please include it in your SEP? It > seems > > > > like > > > > > you already have access. If you are still unable to access the wiki > > > page, > > > > > feel free to walk over to Samza area and find me! > > > > > > > > > > > > > Sure. I have added this summary to the Alternative Section. > > > > > > > > > > > > > > > > > > > I think there is always a way for user to mess up their job if > they > > > > > configure the Samza job incorrectly. > > > > > > > > > > I don't think Jake or anyone is arguing about an "incorrectly" > > > configured > > > > > Samza job. The question was towards how easy/difficult it is for > > users > > > to > > > > > *not mess* up their job with incorrect configurations. > > > > > > > > > > > I also think the assumption made in this SEP is not particularly > > > harder > > > > > to understand than other existing configs in Samza. > > > > > > > > > > I disagree here. Other configs don't require you understand more > than > > > one > > > > > assumption. > > > > > > > > > > There is already an overload of configs in Samza and I think we are > > > > trying > > > > > to shield it as much as possible from the users (esp. with fluent > > api). > > > > > More specifically, we don't want the user to know about the > internals > > > of > > > > > Samza such ssp grouper, taskname grouper etc. Since the proposed > > > solution > > > > > makes the configuration more complex to understand, it *is a* > burden > > on > > > > the > > > > > user. > > > > > > > > > > Just because configs are the way it is, it doesn't mean we increase > > the > > > > > complexity of it and push the burden on users to manage it > correctly. > > > My > > > > > two cents. > > > > > > > > > > > > > Sure, I agree the proposal requires user to understand the assumption > > in > > > > order to expand the partition of the topic. But it is very subjective > > as > > > to > > > > whether the added complexity is acceptable or not. If there is better > > way > > > > to allow user to expand partition of the input stream without making > > > > assumption, then we can just do that. The current solution is not > > > perfect. > > > > But IMO it is the best available solution towards the support of > > > partition > > > > expansion in comparison to alternative, no? > > > > > > > > > > > > > Here are a few things that I believe are needed for wrapping up the > > > SEP: > > > > > > > > > > 1. For the longest time, I thought partition expansion happens in > > Kafka > > > > > only when the volume of messages across partitions is too high. > Based > > > on > > > > > this assumption, I would only assume that re-mapping expanded > > > partitions > > > > to > > > > > the same task will have adverse effect on the throughput/resource > > > > > utilization of the processor/container in Samza (for example, disk > > > > > utilization may increase significantly. With disk quota throttling, > > it > > > > > could cause the processor to drop.). However, after speaking with > > > Xinyu, > > > > it > > > > > turns out that partition expansion also happens when there is a > > > > > per-partition data retention limit imposed by Kafka (not sure if it > > is > > > > only > > > > > in LinkedIn or in Kafka open-source as well). Imo, this is the > > primary > > > > > use-case that we are trying to solve for in Samza and it is not > very > > > > > obvious from the SEP. > > > > > @Dong, can you please explain *the circumstances under which > > partition > > > > > expansion can happen*, under "Motivation" section? I disagree to > the > > > > > current motivation described as -> "This design doc provides a > > solution > > > > to > > > > > increase partition number of the input streams of a stateful Samza > > job > > > > > while still ensuring the correctness of Samze job output. " > > > > > This is a solution, albeit not fully done through this SEP alone. > > > > > > > > > > > > > This is actually already described in the Problem and Goal section, > > i.e. > > > > "For example, Kafka generally needs to limit the maximum size of each > > > > partition to scale up its performance. Thus the number of partitions > > of a > > > > Kafka topic needs to be expanded to reduce the partition size if the > > > > average byte-in-rate or retention time of the Kafka topic has > > doubled". I > > > > can merge those two sections or update the statement if the current > > > > statement has not clearly explained the reason of partition expansion > > in > > > > Kafka. > > > > > > > > > > > > > > > > > > 2. I think we are in consensus about the fact that increasing the > > task > > > > > number and handling the state correctly is a good solution for > Samza > > in > > > > the > > > > > long-run. In your rejected alternatives, you mention "However, this > > > > feature > > > > > alone does not solve the problem of allowing partition expansion.". > > > What > > > > > else is required to allow partition expansion? Can you please > > elaborate > > > > on > > > > > that in point #1 of the rejected alternatives? If there is still > more > > > > work > > > > > to be done to support partition expansion in Samza, it is > worthwhile > > to > > > > > mention it under *Future Work*, instead of under "Rejected > > > Alternatives". > > > > > Perhaps you were waiting for edit permissions to the wiki. Please > > make > > > > this > > > > > change so it is well-tracked. > > > > > > > > > > > > > I thought this is already explained in the rejected alternative > > section. > > > > More specifically, it is said that "However, this feature alone does > > not > > > > solve the problem of allowing partition expansion. For example, say > we > > > have > > > > a job that joins two streams both of which have 3 partitions. If > > > partition > > > > number of one stream increases from 3 to 6, we would still want the > > task > > > > number to remain 3 to make sure that messages with the same key from > > both > > > > streams will be handled by the same task. This needs to be done with > > the > > > > new grouper classes proposed in this doc." > > > > > > > > Does this explanation make sense? > > > > > > > > I have updated wiki and added the task expansion to the Future Work > > > > section. On the other hand I still keep it in the Rejected > Alternative > > > > section to explain why this future work does not replace the existing > > > > proposal in SEP-5. Does this sound reasonable? > > > > > > > > > > > > > I am still not totally crazy about the proposed solution because it > > is > > > > not > > > > > clear for open-source, who or which use-cases stand to benefit. I > am > > > not > > > > > convinced that this problem is of utmost priority for the Samza > > > community > > > > > *at this point of time*. > > > > > > > > > > > > > I think the Problem and Goal section and the Motivation section have > > > > illustrated the use-case for this feature. Let me answer your > questions > > > > more specifically: > > > > > > > > *Who will benefit from this feature:* any Samza user who runs > stateful > > > job > > > > with input from Kafka and needs to expand partition of the input > stream > > > so > > > > that the single partition size doesn't exceed a threshold. > > > > > > > > *Which use-case stand to benefit:* this SEP-5 is useful if user runs > > > > stateful job with input from Kafka and needs to expand partition of > the > > > > input stream so that the single partition size doesn't exceed a > > > threshold. > > > > > > > > *Why it is a important feature:* a user needs this feature if he runs > > > > stateful job with input from Kafka and the partition size of Kafka > has > > > > become too large due to increase in throughput or increase in > retention > > > > time. > > > > > > > > I am not sure what kind of feature can be classified at "utmost > > > priority". > > > > I am also not sure why a feature needs to be "utmost priority" in > order > > > to > > > > be accepted. Can you explain a bit on that? I think we should develop > > > > feature that has a valid use-case. > > > > > > > > > > > > > I am on the same page as Jake on this one. Not a +1, just a 0 (if > > that > > > > even > > > > > matters). > > > > > > > > > > Thanks! > > > > > Navina > > > > > > > > > > On Sun, Jun 18, 2017 at 12:04 AM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > BTW, I will update the SEP-5 wiki with our latest discussion > after > > I > > > > have > > > > > > got the wiki edit access. > > > > > > > > > > > > On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin <lindon...@gmail.com> > > > > wrote: > > > > > > > > > > > > > Thanks everyone for the comment! > > > > > > > > > > > > > > I am currently leaning towards the current approach. I think > > Kartik > > > > > > raised > > > > > > > a good point that the extra repartitoning stage will also incur > > > > > > additional > > > > > > > throughput on Kafka in addition to the potential storage cost. > > Any > > > > > other > > > > > > > Samza developers also chime in and provide your opinions on > this > > > > > > proposal? > > > > > > > > > > > > > > Since this discussion thread has been open for three weeks, I > > will > > > > > > > initiate voting thread on Monday if there is no major revision > > > > > > suggestion. > > > > > > > > > > > > > > Thanks, > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam < > > > > > > > kparamasi...@linkedin.com.invalid> wrote: > > > > > > > > > > > > > >> Great discussion ! > > > > > > >> > > > > > > >> Here are some more thoughts > > > > > > >> > > > > > > >> The point that repartitioning is a more general purpose > solution > > > is > > > > > > surely > > > > > > >> spot on. For many source systems (Kinesis, Google Pub-Sub, > any > > of > > > > the > > > > > > >> older queuing systems (rabbitMQ etc. etc.), repartitioning is > > > > anyways > > > > > > >> functionally required to do even simple keyed aggregations. > > But > > > in > > > > > > most > > > > > > >> of these systems, the concept of repartitioning either does > not > > > > exist > > > > > or > > > > > > >> exists in a way which is very unique (e.g. Kinesis). > > > > > > >> > > > > > > >> I think this feature is really only interesting for source > > systems > > > > > like > > > > > > >> Kafka and EventHub. EventHub (last I checked) didn't support > > > > > > >> repartitioning. So this is probably not super-interesting > (yet) > > > for > > > > > > >> EventHub. > > > > > > >> > > > > > > >> So Kafka is clearly the main use case here. > > > > > > >> > > > > > > >> For Kafka, I think it is pretty rare for people to customize > the > > > > > hashing > > > > > > >> algorithm for sending messages. I would argue that less than > 5% > > > of > > > > > the > > > > > > >> population (i am being generous ;)) would do that. The > current > > > > > > proposal > > > > > > >> works with the default hashing scheme for Kafka. So > > organizations > > > > > will > > > > > > >> typically never have to coordinate. > > > > > > >> > > > > > > >> If the proposed alternative (always repartition) was > side-effect > > > > free, > > > > > > >> then > > > > > > >> it would make sense to use an alternative design that would > work > > > for > > > > > > 100% > > > > > > >> of the population. Repartitioning all input would however > not > > > be > > > > a > > > > > > >> feasible solution (atleast at LinkedIn) as it would double the > > > kafka > > > > > > >> workload. If many samza jobs read from kafka topics, then > the > > > > > > increase > > > > > > >> would be a function of the number of samza jobs. > > > > > > >> > > > > > > >> For low throughput kafka topics, surely explicit > repartitioning > > > > using > > > > > > >> fluent api is feasible. > > > > > > >> > > > > > > >> If the proposal was to make this new policy the default then > > that > > > > > would > > > > > > >> clearly not make much sense. > > > > > > >> > > > > > > >> But it is an opt in policy. If it is not applicable, people > > don't > > > > > have > > > > > > to > > > > > > >> use it. > > > > > > >> > > > > > > >> I do have some questions about the implementation. I will try > to > > > > > respond > > > > > > >> back after spending some more time on this. > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes < > > jacob.m...@gmail.com > > > > > > > > > > wrote: > > > > > > >> > > > > > > >> > Thanks, Dong. > > > > > > >> > > > > > > > >> > The summary looks accurate. > > > > > > >> > > > > > > > >> > I'll let the others chime in, as I believe my perspective > has > > > been > > > > > > >> > adequately captured in this thread. > > > > > > >> > > > > > > > >> > -Jake > > > > > > >> > > > > > > > >> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin < > > lindon...@gmail.com > > > > > > > > > > wrote: > > > > > > >> > > > > > > > >> > > Hey Jacob, > > > > > > >> > > > > > > > > >> > > Thank you for taking so much time to discuss with me! I > > > > appreciate > > > > > > the > > > > > > >> > > discussion and the insight. I will summarize our > discussion > > > > below. > > > > > > >> > > > > > > > > >> > > 1) Whether it is reasonable to store partition-to-task > > > mapping. > > > > > > >> > > > > > > > > >> > > We agree that this partition-to-task mapping will be > > > reasonable > > > > if > > > > > > we > > > > > > >> > allow > > > > > > >> > > user to specify either the new-partition-to-old-partition > > > > mapping > > > > > or > > > > > > >> > > key-to-partition mapping in the future. SEP-5 doesn't > > > currently > > > > > > >> provide a > > > > > > >> > > way for user to specify new-partition-to-old partition > > mapping > > > > > > >> because we > > > > > > >> > > don't have a good idea about that interface until we try > to > > > > enable > > > > > > >> > > partition expansion for input system other than Kafka in > the > > > > > future. > > > > > > >> This > > > > > > >> > > is currently specified as the third future work in SEP-5. > > > > > > >> > > > > > > > > >> > > And if we decide to implement SEP-5, I will include a > > warning > > > > > > message > > > > > > >> > > regarding the use of partition-to-task, i.e. "this does > not > > > > > specify > > > > > > >> the > > > > > > >> > > key-to-task mapping". We agree that this could address the > > > > concern > > > > > > >> here. > > > > > > >> > > > > > > > > >> > > 2) Whether we should follow the approach in SEP-5 or use > an > > > > extra > > > > > > >> > > re-partitioning stage in the stateful Samza job to enable > > > > > partition > > > > > > >> > > expansion. > > > > > > >> > > > > > > > > >> > > Here are the pros and cons of the extra re-partitioning > > stage > > > in > > > > > > >> > comparison > > > > > > >> > > to SEP-5. > > > > > > >> > > > > > > > > >> > > Pros: > > > > > > >> > > - It doesn't require owner of the Samza job to know the > > > > > partitioning > > > > > > >> > > algorithm of used for the input stream. If the owner of > the > > > > Samza > > > > > > job > > > > > > >> is > > > > > > >> > in > > > > > > >> > > a different organization than the producer of the input > > > stream, > > > > > this > > > > > > >> > > solution frees different organizations from having to > > > coordinate > > > > > > with > > > > > > >> > each > > > > > > >> > > other. > > > > > > >> > > - It doesn't require owner of the Samza job to specify the > > > > > > >> partitioning > > > > > > >> > > algorithm of used for the input stream. Thus less config. > > > > > > >> > > > > > > > > >> > > Cons: > > > > > > >> > > - User has to make code change on their side to use the > new > > > > fluent > > > > > > >> API. > > > > > > >> > > - The extra partitioning stage would potentially increases > > > > > latency. > > > > > > >> > > - The extra partitioning stage would incur additional cost > > due > > > > to > > > > > > the > > > > > > >> > extra > > > > > > >> > > internal topic. The cost is probably not that much with > the > > > new > > > > > > trim() > > > > > > >> > API > > > > > > >> > > in Kafka if Samza uses Kafka to store the internal topic. > > But > > > > the > > > > > > cost > > > > > > >> > may > > > > > > >> > > be doubled if Samza uses another input system that doesn't > > > > provide > > > > > > >> trim() > > > > > > >> > > API to delete data on demand. > > > > > > >> > > > > > > > > >> > > My recommendation is to adopt a hybrid solution, i.e. we > > still > > > > > > >> implement > > > > > > >> > > the current proposal in SEP-5 so that we enable partition > > > > > expansion > > > > > > >> > without > > > > > > >> > > incurring extra latency/cost and without requiring users > to > > > > change > > > > > > >> their > > > > > > >> > > code. And we can recommend user to use the extra > > partitioning > > > > > stage > > > > > > if > > > > > > >> > the > > > > > > >> > > coordination among different organization is indeed a > > concern. > > > > > > >> > > > > > > > > >> > > Can other developers also provide feedback regarding your > > > > > preference > > > > > > >> > > between the two? > > > > > > >> > > > > > > > > >> > > Thanks, > > > > > > >> > > Dong > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes < > > > > jacob.m...@gmail.com > > > > > > > > > > > > >> > wrote: > > > > > > >> > > > > > > > > >> > > > Hey Dong, > > > > > > >> > > > > > > > > > >> > > > I appreciate your thoughtful responses. Let's do one > more > > > > round > > > > > > :-) > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > Here are my current concern with the three > alternatives > > > you > > > > > > >> described > > > > > > >> > > > > earlier: > > > > > > >> > > > > - The first alternative requires support from input > > system > > > > > which > > > > > > >> is > > > > > > >> > > > > currently not available. It will limit the usage of > > > > partition > > > > > > >> > expansion > > > > > > >> > > > to > > > > > > >> > > > > only systems that support such interface. And it is > not > > > > > > guaranteed > > > > > > >> > that > > > > > > >> > > > we > > > > > > >> > > > > can persuade the developer of the input system to add > > this > > > > > > >> interface. > > > > > > >> > > > This > > > > > > >> > > > > is not desirable for Samza in the long term. > > > > > > >> > > > > > > > > > >> > > > Agreed. It is very wishful thinking that each supported > > > system > > > > > > would > > > > > > >> > > > provide such a contract. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > - I can not comment on the second alternative because > I > > > > don't > > > > > > >> > > understand > > > > > > >> > > > > how it reshuffles all existing changelog data. We can > > > > discuss > > > > > > >> more if > > > > > > >> > > > there > > > > > > >> > > > > is more specific detail. My gut feel is that this will > > be > > > > > > complex > > > > > > >> and > > > > > > >> > > > > carries performance overhead. > > > > > > >> > > > > > > > > > >> > > > After giving this more thought, I agree. There is no > clear > > > way > > > > > to > > > > > > >> > > migrate a > > > > > > >> > > > changelog without knowing the original key->partition > > > mapping. > > > > > > Which > > > > > > >> > > leads > > > > > > >> > > > us to alternative 3... > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > - The third alternative requires performance overhead. > > > Given > > > > > > that > > > > > > >> > user > > > > > > >> > > > can > > > > > > >> > > > > already use this solution to enable partition > expansion, > > > > maybe > > > > > > >> Samza > > > > > > >> > > > > developers can provide more input as to why we are not > > > doing > > > > > it > > > > > > by > > > > > > >> > > > default. > > > > > > >> > > > > My gut feel is that it carries considerable > performance > > > > > overhead > > > > > > >> and > > > > > > >> > > > > increases the cost-to-serve Samze job (e.g. disk > usage), > > > > which > > > > > > may > > > > > > >> > make > > > > > > >> > > > it > > > > > > >> > > > > undesirable in the long term. > > > > > > >> > > > > > > > > > >> > > > I think the only performance overhead would be the > > mandatory > > > > > > >> > > repartitioning > > > > > > >> > > > stage for stateful jobs. But a repartitioner is usually > > much > > > > > > faster > > > > > > >> > than > > > > > > >> > > > the downstream stateful job, so it only seems a cost to > > > serve > > > > > > issue. > > > > > > >> > > > > > > > > > >> > > > As for why we aren't already doing this, I would posit > > that > > > > > before > > > > > > >> the > > > > > > >> > > > introduction of the high level API, which trivializes > > > > > > >> repartitioning, > > > > > > >> > it > > > > > > >> > > > was unreasonable to expect each job owner to do the > > > mandatory > > > > > > >> > > > repartitioning. With the high level API, I would argue > > this > > > is > > > > > > much > > > > > > >> > more > > > > > > >> > > > doable. > > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > I am not sure it is true that "any future feature that > > > > utilizes > > > > > > this > > > > > > >> > > > > mapping without accounting for the assumptions of this > > SEP > > > > is > > > > > > >> likely > > > > > > >> > to > > > > > > >> > > > > malfunction". Suppose we allow user to specify > > > > > > >> new-to-old-partition > > > > > > >> > > > > mapping, then we can use the partition-to-task mapping > > > > > correctly > > > > > > >> > > without > > > > > > >> > > > > replying on the assumption in this SEP, right? > > > > > > >> > > > > > > > > > >> > > > Right, but my point was that the partition->task mapping > > is > > > > not > > > > > > >> > > sufficient > > > > > > >> > > > by itself. So adding it by itself is potentially > > misleading. > > > > > > >> > > > > > > > > > >> > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin < > > > > lindon...@gmail.com> > > > > > > >> wrote: > > > > > > >> > > > > > > > > > >> > > > > Thanks for the reply Jacob. Please see my comment > > inline. > > > > > > >> > > > > > > > > > > >> > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes < > > > > > > jacob.m...@gmail.com > > > > > > >> > > > > > > > >> > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > - For users that need partition expansion of the > > input > > > > > > streams > > > > > > >> > for > > > > > > >> > > > > > stateful > > > > > > >> > > > > > > job, they have a really big headache in the sense > > that > > > > > Samza > > > > > > >> does > > > > > > >> > > not > > > > > > >> > > > > > allow > > > > > > >> > > > > > > partition expansion for stateful job. SEP-5 > > addresses > > > > this > > > > > > >> > headache > > > > > > >> > > > for > > > > > > >> > > > > > > them. > > > > > > >> > > > > > > You are right that SEP-5 requires user to > understand > > > and > > > > > > >> enforce > > > > > > >> > > > > > > limitations across organizations. But it is still > > much > > > > > > better > > > > > > >> > than > > > > > > >> > > > not > > > > > > >> > > > > > > allowing user to expansion partition for stateful > > jobs > > > > at > > > > > > all, > > > > > > >> > > right? > > > > > > >> > > > > > Did I > > > > > > >> > > > > > > miss something here? > > > > > > >> > > > > > > > > > > > >> > > > > > I guess this one is a matter of perspective. > > > > > > >> > > > > > > > > > > > >> > > > > > One argument is that if the system supports one > case, > > > it's > > > > > > >> better > > > > > > >> > > than > > > > > > >> > > > > none > > > > > > >> > > > > > because there is one less scenario in which the > system > > > > does > > > > > > the > > > > > > >> > wrong > > > > > > >> > > > > > thing. > > > > > > >> > > > > > > > > > > > >> > > > > > The counter argument is for uniform and consistent > > > > behavior, > > > > > > >> which > > > > > > >> > is > > > > > > >> > > > > easy > > > > > > >> > > > > > for users to understand and properly leverage. > > > > > > >> > > > > > > > > > > > >> > > > > > Specifically, I'd argue that the current rule is > very > > > > > simple: > > > > > > >> "you > > > > > > >> > > > cannot > > > > > > >> > > > > > repartition inputs on a stateful job, so you must > > > > > > over-partition > > > > > > >> > the > > > > > > >> > > > > > initial implementation". To me, while that rule is > not > > > > > ideal, > > > > > > >> its > > > > > > >> > > > > > simplicity is better that introducing a new solution > > > that > > > > > has > > > > > > a > > > > > > >> > bunch > > > > > > >> > > > of > > > > > > >> > > > > > caveats, any one of which could be missed. If any > one > > of > > > > the > > > > > > >> > > > assumptions > > > > > > >> > > > > in > > > > > > >> > > > > > this SEP design are violated, the job would behave > > > > > > incorrectly. > > > > > > >> > That > > > > > > >> > > > > puts a > > > > > > >> > > > > > lot more burden on the users than the simpler rule. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > I agree that we have different perspective here. It is > > > true > > > > > that > > > > > > >> user > > > > > > >> > > > would > > > > > > >> > > > > mess up their job if they used this feature in a wrong > > > way, > > > > > i.e. > > > > > > >> > > violate > > > > > > >> > > > > the assumption made in SEP-5. On the other hand, I > think > > > > there > > > > > > is > > > > > > >> > > always > > > > > > >> > > > a > > > > > > >> > > > > way for user to mess up their job if they configure > the > > > > Samza > > > > > > job > > > > > > >> > > > > incorrectly. I also think the assumption made in this > > SEP > > > is > > > > > not > > > > > > >> > > > > particularly harder to understand than other existing > > > > configs > > > > > in > > > > > > >> > Samza. > > > > > > >> > > > > > > > > > > >> > > > > The answer to this can be subjective. I would love to > > hear > > > > > > >> > perspective > > > > > > >> > > > from > > > > > > >> > > > > other developers on this issue. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > That's why I mentioned a few alternatives that, > while > > > more > > > > > > >> complex > > > > > > >> > to > > > > > > >> > > > > > implement, would provide a more consistent behavior > > with > > > > > > simple > > > > > > >> > rules > > > > > > >> > > > for > > > > > > >> > > > > > the users. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > I am open to discuss alternative solutions that can > > > address > > > > > the > > > > > > >> the > > > > > > >> > > > problem > > > > > > >> > > > > in a better manner. I am not opposed to complexity as > > long > > > > as > > > > > it > > > > > > >> > gives > > > > > > >> > > us > > > > > > >> > > > > good long term benefits. > > > > > > >> > > > > > > > > > > >> > > > > Here are my current concern with the three > alternatives > > > you > > > > > > >> described > > > > > > >> > > > > earlier: > > > > > > >> > > > > > > > > > > >> > > > > - The first alternative requires support from input > > system > > > > > which > > > > > > >> is > > > > > > >> > > > > currently not available. It will limit the usage of > > > > partition > > > > > > >> > expansion > > > > > > >> > > > to > > > > > > >> > > > > only systems that support such interface. And it is > not > > > > > > guaranteed > > > > > > >> > that > > > > > > >> > > > we > > > > > > >> > > > > can persuade the developer of the input system to add > > this > > > > > > >> interface. > > > > > > >> > > > This > > > > > > >> > > > > is not desirable for Samza in the long term. > > > > > > >> > > > > > > > > > > >> > > > > - I can not comment on the second alternative because > I > > > > don't > > > > > > >> > > understand > > > > > > >> > > > > how it reshuffles all existing changelog data. We can > > > > discuss > > > > > > >> more if > > > > > > >> > > > there > > > > > > >> > > > > is more specific detail. My gut feel is that this will > > be > > > > > > complex > > > > > > >> and > > > > > > >> > > > > carries performance overhead. > > > > > > >> > > > > > > > > > > >> > > > > - The third alternative requires performance overhead. > > > Given > > > > > > that > > > > > > >> > user > > > > > > >> > > > can > > > > > > >> > > > > already use this solution to enable partition > expansion, > > > > maybe > > > > > > >> Samza > > > > > > >> > > > > developers can provide more input as to why we are not > > > doing > > > > > it > > > > > > by > > > > > > >> > > > default. > > > > > > >> > > > > My gut feel is that it carries considerable > performance > > > > > overhead > > > > > > >> and > > > > > > >> > > > > increases the cost-to-serve Samze job (e.g. disk > usage), > > > > which > > > > > > may > > > > > > >> > make > > > > > > >> > > > it > > > > > > >> > > > > undesirable in the long term. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > Yes, we need a similar check for > > > > > > GroupBySystemStreamPartitionWi > > > > > > >> > > > > > > thFixedTaskNum > > > > > > >> > > > > > > as well. If there is more grouper classes needed > in > > > the > > > > > > >> future, > > > > > > >> > we > > > > > > >> > > > can > > > > > > >> > > > > > > solve this problem cleanly without new config. > Given > > > the > > > > > > >> > > > > > > previousGrouperClass and newGrouperClass, > > > > > > >> KafkaCheckpointLogKey > > > > > > >> > > will > > > > > > >> > > > > > throw > > > > > > >> > > > > > > exception if and only if newGrouperClass is an > > > instance > > > > of > > > > > > >> > > > > > > previousGrouperClass. > > > > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum > should > > > > > extend > > > > > > >> > > > > > > GroupBySystemStreamPartition > > > > > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should > extend > > > > > > >> > > GroupByPartition. > > > > > > >> > > > > > Does > > > > > > >> > > > > > > this address your concern? > > > > > > >> > > > > > > > > > > > >> > > > > > Sounds workable, thanks. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > Can > > > > > > >> > > > > > > you be more specific why Partition-to-task mapping > > is > > > > not > > > > > > >> > > meaningful > > > > > > >> > > > > > > without > > > > > > >> > > > > > > some definition of the key-to-partition > assignments > > > and > > > > > why > > > > > > >> it is > > > > > > >> > > > > > > incomplete and misleading? > > > > > > >> > > > > > > > > > > > >> > > > > > A partition is (in my naive interpretation) an > > > > independent > > > > > > >> queue > > > > > > >> > for > > > > > > >> > > > > > messages of a particular key set. It is not the > > > *identity* > > > > > of > > > > > > >> the > > > > > > >> > > > > partition > > > > > > >> > > > > > that determine the contents of the associated task's > > > local > > > > > > >> state. > > > > > > >> > > > Rather > > > > > > >> > > > > it > > > > > > >> > > > > > is the *contents* of the partition that affect the > > > task's > > > > > > >> state. A > > > > > > >> > > > > > partiton-to-task mapping only captures an identity > > > > > > relationship: > > > > > > >> > > > > > partition1->task1. Without the assumptions of this > > SEP, > > > > this > > > > > > is > > > > > > >> > > > > > insufficient to determine the assignment of keys to > > > tasks, > > > > > > >> which is > > > > > > >> > > > what > > > > > > >> > > > > > really matters. Therefore, any future feature that > > > > utilizes > > > > > > this > > > > > > >> > > > mapping > > > > > > >> > > > > > without accounting for the assumptions of this SEP > is > > > > likely > > > > > > to > > > > > > >> > > > > > malfunction. > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > I am not sure it is true that "any future feature that > > > > > utilizes > > > > > > >> this > > > > > > >> > > > > mapping without accounting for the assumptions of this > > SEP > > > > is > > > > > > >> likely > > > > > > >> > to > > > > > > >> > > > > malfunction". Suppose we allow user to specify > > > > > > >> new-to-old-partition > > > > > > >> > > > > mapping, then we can use the partition-to-task mapping > > > > > correctly > > > > > > >> > > without > > > > > > >> > > > > replying on the assumption in this SEP, right? > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin < > > > > > > lindon...@gmail.com> > > > > > > >> > > wrote: > > > > > > >> > > > > > > > > > > > >> > > > > > > Hey Jacob, > > > > > > >> > > > > > > > > > > > > >> > > > > > > Thanks for the explanation. It seems that your > > biggest > > > > > > >> concern is > > > > > > >> > > > with > > > > > > >> > > > > > the > > > > > > >> > > > > > > generality of the proposal. Let me try to address > > this > > > > and > > > > > > >> other > > > > > > >> > > > > comments > > > > > > >> > > > > > > below. > > > > > > >> > > > > > > > > > > > > >> > > > > > > 1) ... it will cause headaches for Samza users ... > > > > > > >> > > > > > > > > > > > > >> > > > > > > I am not sure I understand why this proposal > causes > > > > > headache > > > > > > >> for > > > > > > >> > > > Samza > > > > > > >> > > > > > > users. Here is the impact of the SEP-5 on users: > > > > > > >> > > > > > > > > > > > > >> > > > > > > - For users that do not need partition expansion > of > > > the > > > > > > input > > > > > > >> > > stream, > > > > > > >> > > > > > they > > > > > > >> > > > > > > can use Samza without change change in > > > > code/binary/config. > > > > > > >> Thus > > > > > > >> > > there > > > > > > >> > > > > is > > > > > > >> > > > > > no > > > > > > >> > > > > > > headache for them. > > > > > > >> > > > > > > > > > > > > >> > > > > > > - For users that need partition expansion of the > > input > > > > > > streams > > > > > > >> > for > > > > > > >> > > > > > > stateless job, they currently need to manually > > reboot > > > > > their > > > > > > >> Samza > > > > > > >> > > job > > > > > > >> > > > > in > > > > > > >> > > > > > > order to let Samza consume the new partitions > > created > > > > for > > > > > > the > > > > > > >> > > stream. > > > > > > >> > > > > > SEP-5 > > > > > > >> > > > > > > actually reduced their headache by allowing Samza > to > > > > > > >> > automatically > > > > > > >> > > > > detect > > > > > > >> > > > > > > and consume new partitions. > > > > > > >> > > > > > > > > > > > > >> > > > > > > - For users that need partition expansion of the > > input > > > > > > streams > > > > > > >> > for > > > > > > >> > > > > > stateful > > > > > > >> > > > > > > job, they have a really big headache in the sense > > that > > > > > Samza > > > > > > >> does > > > > > > >> > > not > > > > > > >> > > > > > allow > > > > > > >> > > > > > > partition expansion for stateful job. SEP-5 > > addresses > > > > this > > > > > > >> > headache > > > > > > >> > > > for > > > > > > >> > > > > > > them. > > > > > > >> > > > > > > > > > > > > >> > > > > > > You are right that SEP-5 requires user to > understand > > > and > > > > > > >> enforce > > > > > > >> > > > > > > limitations across organizations. But it is still > > much > > > > > > better > > > > > > >> > than > > > > > > >> > > > not > > > > > > >> > > > > > > allowing user to expansion partition for stateful > > jobs > > > > at > > > > > > all, > > > > > > >> > > right? > > > > > > >> > > > > > Did I > > > > > > >> > > > > > > miss something here? > > > > > > >> > > > > > > > > > > > > >> > > > > > > 2) ... Separate orgs are often difficult to > > coordinate > > > > > and a > > > > > > >> > system > > > > > > >> > > > > which > > > > > > >> > > > > > > depends on such significant process/coordination > is > > > too > > > > > > >> fragile > > > > > > >> > for > > > > > > >> > > > my > > > > > > >> > > > > > > taste .. > > > > > > >> > > > > > > > > > > > > >> > > > > > > This is true. Ideally we want a system that is > fully > > > > > > >> > self-serving. > > > > > > >> > > I > > > > > > >> > > > > > think > > > > > > >> > > > > > > this is a long term goal for Samza. Still, for the > > > > reasons > > > > > > >> > > described > > > > > > >> > > > > > above, > > > > > > >> > > > > > > I think something is better than nothing. I am > open > > to > > > > > > >> > alternative > > > > > > >> > > > > design > > > > > > >> > > > > > > that can support partition expansion for stateful > > jobs > > > > > > without > > > > > > >> > > > > requiring > > > > > > >> > > > > > > coordination. > > > > > > >> > > > > > > > > > > > > >> > > > > > > 3) There is currently no supported way of sharing > > > state > > > > > > among > > > > > > >> the > > > > > > >> > > > tasks > > > > > > >> > > > > > of > > > > > > >> > > > > > > a container. Each task has its own isolated store > > and > > > > > that > > > > > > >> > logical > > > > > > >> > > > > > > isolation is the primary thing that enables Samza > > jobs > > > > to > > > > > > >> scale > > > > > > >> > > with > > > > > > >> > > > a > > > > > > >> > > > > > > simple container count change. My feeling is that > we > > > > > should > > > > > > >> not > > > > > > >> > > > change > > > > > > >> > > > > > > this without > > > > > > >> > > > > > > good reason. > > > > > > >> > > > > > > > > > > > > >> > > > > > > I see your point. I will remove this sentence from > > the > > > > > > >> motivation > > > > > > >> > > > > > section. > > > > > > >> > > > > > > This won't have any impact on the design of the > > SEP-5. > > > > > Does > > > > > > >> this > > > > > > >> > > > > address > > > > > > >> > > > > > > the problem? > > > > > > >> > > > > > > > > > > > > >> > > > > > > 4) With the current proposal, we'd also need a > > similar > > > > > check > > > > > > >> for > > > > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as > > well. > > > > And > > > > > > if > > > > > > >> any > > > > > > >> > > > other > > > > > > >> > > > > > > groupers > > > > > > >> > > > > > > were later added with both these modes, we'd > > probably > > > > need > > > > > > to > > > > > > >> add > > > > > > >> > > > those > > > > > > >> > > > > > > too. It might be easier and cleaner to add a > config > > to > > > > > > ignore > > > > > > >> > that > > > > > > >> > > > > check > > > > > > >> > > > > > > temporarily. Down side is that it further > > complicates > > > > the > > > > > > >> Samza > > > > > > >> > > > config, > > > > > > >> > > > > > > which is already huge. Thoughts? > > > > > > >> > > > > > > > > > > > > >> > > > > > > Yes, we need a similar check for > > > > > > >> GroupBySystemStreamPartitionWi > > > > > > >> > > > > > > thFixedTaskNum > > > > > > >> > > > > > > as well. If there is more grouper classes needed > in > > > the > > > > > > >> future, > > > > > > >> > we > > > > > > >> > > > can > > > > > > >> > > > > > > solve this problem cleanly without new config. > Given > > > the > > > > > > >> > > > > > > previousGrouperClass and newGrouperClass, > > > > > > >> KafkaCheckpointLogKey > > > > > > >> > > will > > > > > > >> > > > > > throw > > > > > > >> > > > > > > exception if and only if newGrouperClass is an > > > instance > > > > of > > > > > > >> > > > > > > previousGrouperClass. > > > > > > >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum > should > > > > > extend > > > > > > >> > > > > > > GroupBySystemStreamPartition > > > > > > >> > > > > > > and GroupByPartitionWithFixedTaskNum should > extend > > > > > > >> > > GroupByPartition. > > > > > > >> > > > > > Does > > > > > > >> > > > > > > this address your concern? > > > > > > >> > > > > > > > > > > > > >> > > > > > > 5) The task-to-container and container-to-host > > > mappings > > > > > are > > > > > > >> both > > > > > > >> > > > > > meaningful > > > > > > >> > > > > > > in context of the JobModel. Partition-to-task > > mapping > > > is > > > > > not > > > > > > >> > > > meaningful > > > > > > >> > > > > > > without > > > > > > >> > > > > > > some definition of the key-to-partition > assignments. > > > > It's > > > > > > >> > > incomplete > > > > > > >> > > > > > > information and therefore misleading. I think it > > only > > > > > makes > > > > > > >> sense > > > > > > >> > > to > > > > > > >> > > > > use > > > > > > >> > > > > > > this mapping if we adopt a solution wherein Samza > > also > > > > > knows > > > > > > >> the > > > > > > >> > > > > > partition > > > > > > >> > > > > > > key assignment. > > > > > > >> > > > > > > > > > > > > >> > > > > > > Partition-to-task is currently explicitly passed > > from > > > > job > > > > > > >> > > coordinator > > > > > > >> > > > > to > > > > > > >> > > > > > > each task as part of the job model to tell tasks > > which > > > > > > >> partitions > > > > > > >> > > to > > > > > > >> > > > > > > consume from. I think we can store some definition > > of > > > > the > > > > > > >> > > > > > key-to-partition > > > > > > >> > > > > > > assignments if Samza decides to get and use this > > > > > information > > > > > > >> in > > > > > > >> > the > > > > > > >> > > > > > > future. Can > > > > > > >> > > > > > > you be more specific why Partition-to-task mapping > > is > > > > not > > > > > > >> > > meaningful > > > > > > >> > > > > > > without > > > > > > >> > > > > > > some definition of the key-to-partition > assignments > > > and > > > > > why > > > > > > >> it is > > > > > > >> > > > > > > incomplete and misleading? > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > Thanks, > > > > > > >> > > > > > > Dong > > > > > > >> > > > > > > > > > > > > >> > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes < > > > > > > >> > jacob.m...@gmail.com> > > > > > > >> > > > > > wrote: > > > > > > >> > > > > > > > > > > > > >> > > > > > > > Hey Dong, > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > I'm opposed (or a +0, at best) to this limited, > > > > > > >> Kafka-specific > > > > > > >> > > > > > solution. > > > > > > >> > > > > > > I > > > > > > >> > > > > > > > understand that the proposal is relatively > simple > > to > > > > > > >> implement, > > > > > > >> > > > but I > > > > > > >> > > > > > > think > > > > > > >> > > > > > > > it will cause headaches for Samza users. They > will > > > not > > > > > > only > > > > > > >> > have > > > > > > >> > > to > > > > > > >> > > > > > > > understand all the limitations (increase only, > > > double > > > > > > >> > partitions > > > > > > >> > > > > only, > > > > > > >> > > > > > > > partition using hash+modulo, etc) of this > > approach, > > > > but > > > > > > >> > enforcing > > > > > > >> > > > > these > > > > > > >> > > > > > > > limitations can be a major problem, especially > > when > > > > the > > > > > > >> Samza > > > > > > >> > > jobs > > > > > > >> > > > > and > > > > > > >> > > > > > > > message brokers are managed by separate orgs in > a > > > > > company. > > > > > > >> > > Separate > > > > > > >> > > > > > orgs > > > > > > >> > > > > > > > are often difficult to coordinate and a system > > which > > > > > > >> depends on > > > > > > >> > > > such > > > > > > >> > > > > > > > significant process/coordination is too fragile > > for > > > my > > > > > > >> taste. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > That said, I realize that my opinion is just one > > of > > > > many > > > > > > in > > > > > > >> the > > > > > > >> > > > > broader > > > > > > >> > > > > > > > community which may feel differently, so let me > > > > respond > > > > > to > > > > > > >> some > > > > > > >> > > of > > > > > > >> > > > > the > > > > > > >> > > > > > > > other items in the discussion so we can clear > them > > > up: > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > The task-to-container assignment matters because > > if > > > > the > > > > > > >> > > correlated > > > > > > >> > > > > > tasks > > > > > > >> > > > > > > > > (i.e. tasks that consume messages with the > same > > > key) > > > > > > >> needs to > > > > > > >> > > be > > > > > > >> > > > in > > > > > > >> > > > > > the > > > > > > >> > > > > > > > > same container so that they can share the same > > > > > key/value > > > > > > >> > local > > > > > > >> > > > > store > > > > > > >> > > > > > on > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > same physical machine. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > There is currently no supported way of sharing > > state > > > > > among > > > > > > >> the > > > > > > >> > > > tasks > > > > > > >> > > > > > of a > > > > > > >> > > > > > > > container. Each task has its own isolated store > > and > > > > > that > > > > > > >> > logical > > > > > > >> > > > > > > isolation > > > > > > >> > > > > > > > is the primary thing that enables Samza jobs to > > > scale > > > > > > with a > > > > > > >> > > simple > > > > > > >> > > > > > > > container count change. My feeling is that we > > should > > > > not > > > > > > >> change > > > > > > >> > > > this > > > > > > >> > > > > > > > without good reason. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > I think we can hardcode new logic in > > > > > > >> > KafkaCheckpointLogKey.scala > > > > > > >> > > > such > > > > > > >> > > > > > > that > > > > > > >> > > > > > > > > exception will not be thrown if new grouper is > > > > > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old > > grouper > > > is > > > > > > >> > > > > > GroupByPartition. > > > > > > >> > > > > > > > Does > > > > > > >> > > > > > > > > this look reasonable? > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > With the current proposal, we'd also need a > > similar > > > > > check > > > > > > >> for > > > > > > >> > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as > > > well. > > > > > And > > > > > > >> if > > > > > > >> > any > > > > > > >> > > > > other > > > > > > >> > > > > > > > groupers were later added with both these modes, > > > we'd > > > > > > >> probably > > > > > > >> > > need > > > > > > >> > > > > to > > > > > > >> > > > > > > add > > > > > > >> > > > > > > > those too. It might be easier and cleaner to > add a > > > > > config > > > > > > to > > > > > > >> > > ignore > > > > > > >> > > > > > that > > > > > > >> > > > > > > > check temporarily. Down side is that it further > > > > > > complicates > > > > > > >> the > > > > > > >> > > > Samza > > > > > > >> > > > > > > > config, which is already huge. Thoughts? > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > I think storing the previous task-to-partition > > > mapping > > > > > is > > > > > > >> more > > > > > > >> > > > > general > > > > > > >> > > > > > > than > > > > > > >> > > > > > > > > storing the partition count of all topics for > > the > > > > > > >> following > > > > > > >> > > > > reasons: > > > > > > >> > > > > > > > > - Samza already stores the task-to-container > > > mapping > > > > > and > > > > > > >> > > > > > > > container-to-host > > > > > > >> > > > > > > > > mapping in the coordinator stream. It seems > > > > consistent > > > > > > to > > > > > > >> > also > > > > > > >> > > > > store > > > > > > >> > > > > > > the > > > > > > >> > > > > > > > > partition-to-task mapping. And this > information > > > may > > > > be > > > > > > >> useful > > > > > > >> > > for > > > > > > >> > > > > > other > > > > > > >> > > > > > > > > use-case such as debugging. > > > > > > >> > > > > > > > > - By having the new interface take the > previous > > > > > > >> > > task-to-partition > > > > > > >> > > > > > > > > assignment instead of a > topic-to-partition-count > > > > > mapping > > > > > > >> as > > > > > > >> > new > > > > > > >> > > > > > > > parameter, > > > > > > >> > > > > > > > > we can potentially have grouper implementation > > to > > > > > > support > > > > > > >> > other > > > > > > >> > > > > types > > > > > > >> > > > > > > of > > > > > > >> > > > > > > > > input systems. > > > > > > >> > > > > > > > > - It is sightly simpler to store the > > > > task-to-partition > > > > > > >> > > assignment > > > > > > >> > > > > > > because > > > > > > >> > > > > > > > > we don't need to know whether this is the > first > > > > time a > > > > > > >> job is > > > > > > >> > > > > started > > > > > > >> > > > > > > or > > > > > > >> > > > > > > > > not. On the other hand, you can write > > > > > > >> > topic-to-partition-count > > > > > > >> > > > > > mapping > > > > > > >> > > > > > > to > > > > > > >> > > > > > > > > the coordinator stream only if this is the > first > > > > time > > > > > > the > > > > > > >> job > > > > > > >> > > is > > > > > > >> > > > > run > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > The task-to-container and container-to-host > > mappings > > > > are > > > > > > >> both > > > > > > >> > > > > > meaningful > > > > > > >> > > > > > > in > > > > > > >> > > > > > > > context of the JobModel. Partition-to-task > mapping > > > is > > > > > not > > > > > > >> > > > meaningful > > > > > > >> > > > > > > > without some definition of the key-to-partition > > > > > > assignments. > > > > > > >> > It's > > > > > > >> > > > > > > > incomplete information and therefore > misleading. I > > > > think > > > > > > it > > > > > > >> > only > > > > > > >> > > > > makes > > > > > > >> > > > > > > > sense to use this mapping if we adopt a solution > > > > wherein > > > > > > >> Samza > > > > > > >> > > also > > > > > > >> > > > > > knows > > > > > > >> > > > > > > > the partition key assignment. > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > -Jake > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin < > > > > > > >> lindon...@gmail.com > > > > > > >> > > > > > > > > >> > > > > wrote: > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Hey Jacob, > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Thanks for taking time to review the SEP. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > I agree with you and Navina that the current > SEP > > > > > doesn't > > > > > > >> > > provide > > > > > > >> > > > > > > support > > > > > > >> > > > > > > > to > > > > > > >> > > > > > > > > arbitrary input systems and it doesn't support > > > > > partition > > > > > > >> > > shrink. > > > > > > >> > > > I > > > > > > >> > > > > > > think > > > > > > >> > > > > > > > > the scope of this SEP is to support partition > > > > > expansion > > > > > > >> for > > > > > > >> > > Kafka > > > > > > >> > > > > > (the > > > > > > >> > > > > > > > most > > > > > > >> > > > > > > > > widely used input system of Samza) and keep > the > > > door > > > > > > open > > > > > > >> for > > > > > > >> > > > > > partition > > > > > > >> > > > > > > > > support of various input systems. The current > > > design > > > > > can > > > > > > >> > > support > > > > > > >> > > > > any > > > > > > >> > > > > > > > system > > > > > > >> > > > > > > > > that meets the two operational requirement > > > specified > > > > > in > > > > > > >> the > > > > > > >> > > doc. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > While it is possible to support more types of > > > input > > > > > > >> systems, > > > > > > >> > it > > > > > > >> > > > > will > > > > > > >> > > > > > > > likely > > > > > > >> > > > > > > > > add more complexity to the design. For > example, > > > the > > > > > > first > > > > > > >> > > > > alternative > > > > > > >> > > > > > > > > solution from you requires broker-side support > > to > > > > > > >> negotiate > > > > > > >> > > hash > > > > > > >> > > > > > > > algorithm. > > > > > > >> > > > > > > > > The second alternative solution requires > > changelog > > > > > > >> partition > > > > > > >> > > > > > reshuffle > > > > > > >> > > > > > > > > which carries its own design complexity and > > > > > performance > > > > > > >> > > overhead. > > > > > > >> > > > > > There > > > > > > >> > > > > > > > is > > > > > > >> > > > > > > > > tradeoff between the generality and the > > complexity > > > > > among > > > > > > >> > these > > > > > > >> > > > > > > choices. I > > > > > > >> > > > > > > > > like the current design because it is simple > and > > > > > > >> addresses a > > > > > > >> > > big > > > > > > >> > > > > > usage > > > > > > >> > > > > > > > > scenario for us. We can add more complexity to > > > > > > generalize > > > > > > >> the > > > > > > >> > > > > design > > > > > > >> > > > > > if > > > > > > >> > > > > > > > it > > > > > > >> > > > > > > > > enables important use-case. Does this sound > > > > > reasonable? > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Note that the "Rejected Alternative" section > > also > > > > > > mentions > > > > > > >> > the > > > > > > >> > > > > > > > possibility > > > > > > >> > > > > > > > > of supporting a wider range of input systems > by > > > > > allowing > > > > > > >> user > > > > > > >> > > to > > > > > > >> > > > > > > specify > > > > > > >> > > > > > > > > the new-partition to old-partition mapping. We > > are > > > > not > > > > > > >> doing > > > > > > >> > it > > > > > > >> > > > > > because > > > > > > >> > > > > > > > 1) > > > > > > >> > > > > > > > > we may have better understanding of the design > > > after > > > > > we > > > > > > >> have > > > > > > >> > a > > > > > > >> > > > > > specific > > > > > > >> > > > > > > > > second input system to support 2) the current > > > design > > > > > can > > > > > > >> be > > > > > > >> > > > > extended > > > > > > >> > > > > > to > > > > > > >> > > > > > > > > support general input systems. I think similar > > > > > argument > > > > > > >> can > > > > > > >> > be > > > > > > >> > > > > > applied > > > > > > >> > > > > > > > > explain why we don't have to support general > > input > > > > > > systems > > > > > > >> > > using > > > > > > >> > > > > the > > > > > > >> > > > > > > > > potentially-good alternatives you mentioned. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > I hope SEP-5 can be an important first-step > > > towards > > > > > > >> > supporting > > > > > > >> > > > > > > partition > > > > > > >> > > > > > > > > expansion of any input system. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > To answer your questions about the current > > > proposal: > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > >1. "An alternative solution is to allow task > > > number > > > > > to > > > > > > >> > > increase > > > > > > >> > > > > > after > > > > > > >> > > > > > > > > >partition expansion and uses a proper > > > > > task-to-container > > > > > > >> > > > assignment > > > > > > >> > > > > > to > > > > > > >> > > > > > > > make > > > > > > >> > > > > > > > > >sure the Samza output is correct." What does > > the > > > > > > >> container > > > > > > >> > > have > > > > > > >> > > > to > > > > > > >> > > > > > do > > > > > > >> > > > > > > > with > > > > > > >> > > > > > > > > >stateful processing or output in general? > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > The task-to-container assignment matters > because > > > if > > > > > the > > > > > > >> > > > correlated > > > > > > >> > > > > > > tasks > > > > > > >> > > > > > > > > (i.e. tasks that consume messages with the > same > > > key) > > > > > > >> needs to > > > > > > >> > > be > > > > > > >> > > > in > > > > > > >> > > > > > the > > > > > > >> > > > > > > > > same container so that they can share the same > > > > > key/value > > > > > > >> > local > > > > > > >> > > > > store > > > > > > >> > > > > > on > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > same physical machine. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > >2. When you use "Join" as an example, you > > > basically > > > > > > mean > > > > > > >> > > > multiple > > > > > > >> > > > > > > > > >co-partitioned streams, right? This is > opposed > > to > > > > > > >> multiple, > > > > > > >> > > > > > > > > >independently-partitioned streams or a single > > > > stream. > > > > > > >> Would > > > > > > >> > be > > > > > > >> > > > > nice > > > > > > >> > > > > > to > > > > > > >> > > > > > > > > >formulate the proposal in these more general > > > terms. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > I thought "join" is a commonly used to refer > to > > > the > > > > > join > > > > > > >> > > > opeartion > > > > > > >> > > > > > with > > > > > > >> > > > > > > > > co-partitioned stream but I may be wrong. I > have > > > > > updated > > > > > > >> the > > > > > > >> > > wiki > > > > > > >> > > > > to > > > > > > >> > > > > > > > > explicitly mention "co-partitioned stream". > Does > > > > this > > > > > > look > > > > > > >> > > better > > > > > > >> > > > > > now? > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > >3. When switching SSP groupers, how will the > > > users > > > > > > avoid > > > > > > >> the > > > > > > >> > > > > > > > > >org.apache.samza.checkpoint.kafka. > > > > > > >> > > > DifferingSystemStreamPartition > > > > > > >> > > > > > > > > GrouperFactoryValues > > > > > > >> > > > > > > > > >exception? > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > I think we can hardcode new logic in > > > > > > >> > > KafkaCheckpointLogKey.scala > > > > > > >> > > > > such > > > > > > >> > > > > > > > that > > > > > > >> > > > > > > > > exception will not be thrown if new grouper is > > > > > > >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old > > grouper > > > is > > > > > > >> > > > > > GroupByPartition. > > > > > > >> > > > > > > > Does > > > > > > >> > > > > > > > > this look reasonable? > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > >4. Partition to task assignment is > meaningless > > > > > without > > > > > > >> key > > > > > > >> > to > > > > > > >> > > > > > > partition > > > > > > >> > > > > > > > > >mapping. The real semantics are captured in > the > > > > > > external > > > > > > >> > > > > requirement > > > > > > >> > > > > > > for > > > > > > >> > > > > > > > > >partitioning via hash+modulo. But in that > case, > > > > iiuc, > > > > > > >> only > > > > > > >> > the > > > > > > >> > > > > > > partition > > > > > > >> > > > > > > > > >count matters. So why not just store the > > original > > > > > > >> partition > > > > > > >> > > > count > > > > > > >> > > > > > > rather > > > > > > >> > > > > > > > > >than the whole mapping? > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > I think storing the previous task-to-partition > > > > mapping > > > > > > is > > > > > > >> > more > > > > > > >> > > > > > general > > > > > > >> > > > > > > > than > > > > > > >> > > > > > > > > storing the partition count of all topics for > > the > > > > > > >> following > > > > > > >> > > > > reasons: > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > - Samza already stores the task-to-container > > > mapping > > > > > and > > > > > > >> > > > > > > > container-to-host > > > > > > >> > > > > > > > > mapping in the coordinator stream. It seems > > > > consistent > > > > > > to > > > > > > >> > also > > > > > > >> > > > > store > > > > > > >> > > > > > > the > > > > > > >> > > > > > > > > partition-to-task mapping. And this > information > > > may > > > > be > > > > > > >> useful > > > > > > >> > > for > > > > > > >> > > > > > other > > > > > > >> > > > > > > > > use-case such as debugging. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > - By having the new interface take the > previous > > > > > > >> > > task-to-partition > > > > > > >> > > > > > > > > assignment instead of a > topic-to-partition-count > > > > > mapping > > > > > > >> as > > > > > > >> > new > > > > > > >> > > > > > > > parameter, > > > > > > >> > > > > > > > > we can potentially have grouper implementation > > to > > > > > > support > > > > > > >> > other > > > > > > >> > > > > types > > > > > > >> > > > > > > of > > > > > > >> > > > > > > > > input systems. > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > - It is sightly simpler to store the > > > > task-to-partition > > > > > > >> > > assignment > > > > > > >> > > > > > > because > > > > > > >> > > > > > > > > we don't need to know whether this is the > first > > > > time a > > > > > > >> job is > > > > > > >> > > > > started > > > > > > >> > > > > > > or > > > > > > >> > > > > > > > > not. On the other hand, you can write > > > > > > >> > topic-to-partition-count > > > > > > >> > > > > > mapping > > > > > > >> > > > > > > to > > > > > > >> > > > > > > > > the coordinator stream only if this is the > first > > > > time > > > > > > the > > > > > > >> job > > > > > > >> > > is > > > > > > >> > > > > run > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > Thanks, > > > > > > >> > > > > > > > > Dong > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes < > > > > > > >> > > > jacob.m...@gmail.com> > > > > > > >> > > > > > > > wrote: > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Hey Dong, > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Thanks for the SEP. Supporting partition > > changes > > > > is > > > > > > >> > > critically > > > > > > >> > > > > > > > important > > > > > > >> > > > > > > > > > for stateful Samza jobs, so it's great to > see > > > some > > > > > > >> ideas on > > > > > > >> > > > that > > > > > > >> > > > > > > front! > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Sorry for the late feedback, but I have a > few > > > > > thoughts > > > > > > >> to > > > > > > >> > > > > > contribute. > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Big +1 on Navina's comment: > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > My biggest gripe with this SEP is that it > > > seems > > > > > > like a > > > > > > >> > > > > > tailor-made > > > > > > >> > > > > > > > > > > solution > > > > > > >> > > > > > > > > > > that relies on the semantics of the Kafka > > > system > > > > > and > > > > > > >> yet, > > > > > > >> > > we > > > > > > >> > > > > are > > > > > > >> > > > > > > > trying > > > > > > >> > > > > > > > > > to > > > > > > >> > > > > > > > > > > masquerade that as operational > requirements > > > for > > > > > > other > > > > > > >> > > systems > > > > > > >> > > > > > > > > interacting > > > > > > >> > > > > > > > > > > with Samza. (Not to say that this is the > > first > > > > > time > > > > > > >> such > > > > > > >> > a > > > > > > >> > > > > choice > > > > > > >> > > > > > > is > > > > > > >> > > > > > > > > > being > > > > > > >> > > > > > > > > > > made in the Samza design). I am not seeing > > how > > > > > this > > > > > > >> can a > > > > > > >> > > > > > "general" > > > > > > >> > > > > > > > > > > solution for all input systems. That's my > > two > > > > > > cents. I > > > > > > >> > > would > > > > > > >> > > > > like > > > > > > >> > > > > > > to > > > > > > >> > > > > > > > > hear > > > > > > >> > > > > > > > > > > alternative points of view for this from > > other > > > > > devs. > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Two examples of this: > > > > > > >> > > > > > > > > > 1. This is mostly a hypothetical, but some > > > message > > > > > > >> brokers > > > > > > >> > > may > > > > > > >> > > > > use > > > > > > >> > > > > > > key > > > > > > >> > > > > > > > > > range assignment rather than hash+modulo. > > > > > > >> > > > > > > > > > 2. Kafka can't reduce the number of > > partitions, > > > > but > > > > > it > > > > > > >> can > > > > > > >> > > > happen > > > > > > >> > > > > > on > > > > > > >> > > > > > > > > other > > > > > > >> > > > > > > > > > systems. For example, it may be cheaper to > > > reduce > > > > > the > > > > > > >> > number > > > > > > >> > > of > > > > > > >> > > > > > > > > partitions > > > > > > >> > > > > > > > > > on a hosted service where the cost model > > depends > > > > on > > > > > > the > > > > > > >> > > number > > > > > > >> > > > of > > > > > > >> > > > > > > > > > partitions/shards. > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > It seems to me that a solution which doesn't > > > > depend > > > > > on > > > > > > >> > > > partition > > > > > > >> > > > > > key > > > > > > >> > > > > > > > > > assignment in the message broker. Here are a > > few > > > > > > >> > alternatives > > > > > > >> > > > > that > > > > > > >> > > > > > > > > weren't > > > > > > >> > > > > > > > > > discussed and I think should be considered: > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Alternatives in order of increasing > > preference: > > > > > > >> > > > > > > > > > 1. Samza manages the partition hash (via > some > > > new > > > > > > >> contract > > > > > > >> > > with > > > > > > >> > > > > the > > > > > > >> > > > > > > > > > brokers) and guarantees correct routing of > > keys > > > > > among > > > > > > >> the > > > > > > >> > new > > > > > > >> > > > > > > > partitions. > > > > > > >> > > > > > > > > > 2. Samza detects a task count change, > creates > > a > > > > new > > > > > > >> > changelog > > > > > > >> > > > > with > > > > > > >> > > > > > > > > correct > > > > > > >> > > > > > > > > > partitions, and *somehow* reshuffles all > > > existing > > > > > > >> changelog > > > > > > >> > > > data > > > > > > >> > > > > > into > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > new topic and then uses the new topic from > > then > > > > on. > > > > > > >> > (doesn't > > > > > > >> > > > work > > > > > > >> > > > > > > > without > > > > > > >> > > > > > > > > > changelog, but in that case durability isn't > > > > > > paramount, > > > > > > >> so > > > > > > >> > we > > > > > > >> > > > can > > > > > > >> > > > > > > just > > > > > > >> > > > > > > > > > wipe) > > > > > > >> > > > > > > > > > 3. Use RPC in between stages and samza fully > > > > manages > > > > > > key > > > > > > >> > > > > assignment > > > > > > >> > > > > > > > among > > > > > > >> > > > > > > > > > tasks. No on-disk topic data to clean up. > > > > Mandatory > > > > > > >> > > > > repartitioning > > > > > > >> > > > > > in > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > first stage to pre-scaled tasks in next > stage. > > > > > > >> > > > > > > > > > 4. Combined 2-3 solution > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > Finally, some questions about the current > > > > proposal: > > > > > > >> > > > > > > > > > 1. "An alternative solution is to allow task > > > > number > > > > > to > > > > > > >> > > increase > > > > > > >> > > > > > after > > > > > > >> > > > > > > > > > partition expansion and uses a proper > > > > > > task-to-container > > > > > > >> > > > > assignment > > > > > > >> > > > > > to > > > > > > >> > > > > > > > > make > > > > > > >> > > > > > > > > > sure the Samza output is correct." What does > > the > > > > > > >> container > > > > > > >> > > have > > > > > > >> > > > > to > > > > > > >> > > > > > do > > > > > > >> > > > > > > > > with > > > > > > >> > > > > > > > > > stateful processing or output in general? > > > > > > >> > > > > > > > > > 2. When you use "Join" as an example, you > > > > basically > > > > > > mean > > > > > > >> > > > multiple > > > > > > >> > > > > > > > > > co-partitioned streams, right? This is > opposed > > > to > > > > > > >> multiple, > > > > > > >> > > > > > > > > > independently-partitioned streams or a > single > > > > > stream. > > > > > > >> Would > > > > > > >> > > be > > > > > > >> > > > > nice > > > > > > >> > > > > > > to > > > > > > >> > > > > > > > > > formulate the proposal in these more general > > > > terms. > > > > > > >> > > > > > > > > > 3. When switching SSP groupers, how will the > > > users > > > > > > avoid > > > > > > >> > the > > > > > > >> > > > > > > > > > org.apache.samza.checkpoint.kafka. > > > > > > >> > DifferingSystemStreamParti > > > > > > >> > > > > > > > > > tionGrouperFactoryValues > > > > > > >> > > > > > > > > > exception? > > > > > > >> > > > > > > > > > 4. Partition to task assignment is > meaningless > > > > > without > > > > > > >> key > > > > > > >> > to > > > > > > >> > > > > > > partition > > > > > > >> > > > > > > > > > mapping. The real semantics are captured in > > the > > > > > > external > > > > > > >> > > > > > requirement > > > > > > >> > > > > > > > for > > > > > > >> > > > > > > > > > partitioning via hash+modulo. But in that > > case, > > > > > iiuc, > > > > > > >> only > > > > > > >> > > the > > > > > > >> > > > > > > > partition > > > > > > >> > > > > > > > > > count matters. So why not just store the > > > original > > > > > > >> partition > > > > > > >> > > > count > > > > > > >> > > > > > > > rather > > > > > > >> > > > > > > > > > than the whole mapping? > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > -Jake > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin < > > > > > > >> > > lindon...@gmail.com > > > > > > >> > > > > > > > > > > >> > > > > > > wrote: > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > Hey Yi, Navina, > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > I have updated the SEP-5 document based on > > our > > > > > > >> > discussion. > > > > > > >> > > > The > > > > > > >> > > > > > > > > difference > > > > > > >> > > > > > > > > > > can be found here > > > > > > >> > > > > > > > > > > <https://cwiki.apache.org/ > confluence/pages/ > > > > > > >> > > > > > > diffpagesbyversion.action > > > > > > >> > > > > > > > ? > > > > > > >> > > > > > > > > > > pageId=70255476&selectedPageVersions=14& > > > > > > >> > > selectedPageVersions > > > > > > >> > > > > > =15>. > > > > > > >> > > > > > > > > > > Here is the summary of changes: > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > - Add new interface that extends the > > existing > > > > > > >> interface > > > > > > >> > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added > > > > grouper > > > > > > >> class > > > > > > >> > > > should > > > > > > >> > > > > > > > > implement > > > > > > >> > > > > > > > > > > this interface. > > > > > > >> > > > > > > > > > > - Explained in the Rejected Alternative > > > Section > > > > > why > > > > > > we > > > > > > >> > > don't > > > > > > >> > > > > add > > > > > > >> > > > > > > new > > > > > > >> > > > > > > > > > method > > > > > > >> > > > > > > > > > > in the existing interface > > > > > > >> > > > > > > > > > > - Explained in the Rejected Alternative > > > Section > > > > > why > > > > > > we > > > > > > >> > > don't > > > > > > >> > > > > > > > > config/class > > > > > > >> > > > > > > > > > > for user to specify new-partition to > > > > old-partition > > > > > > >> > mapping. > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > Can you take another look at the proposal > > and > > > > let > > > > > me > > > > > > >> know > > > > > > >> > > if > > > > > > >> > > > > > there > > > > > > >> > > > > > > is > > > > > > >> > > > > > > > > any > > > > > > >> > > > > > > > > > > concern? > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > Cheers, > > > > > > >> > > > > > > > > > > Dong > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin > < > > > > > > >> > > > lindon...@gmail.com > > > > > > >> > > > > > > > > > > > >> > > > > > > > wrote: > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Hey Yi, > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks much for the comment. I have > > updated > > > > the > > > > > > doc > > > > > > >> to > > > > > > >> > > > > address > > > > > > >> > > > > > > all > > > > > > >> > > > > > > > > your > > > > > > >> > > > > > > > > > > > comments except the one related to the > > > > > interface. > > > > > > I > > > > > > >> am > > > > > > >> > > not > > > > > > >> > > > > > sure I > > > > > > >> > > > > > > > > > > > understand your suggestion of the new > > > > interface. > > > > > > >> Will > > > > > > >> > > > discuss > > > > > > >> > > > > > > > > tomorrow. > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > > > > > > >> > > > > > > > > > > > Dong > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan > < > > > > > > >> > > > nickpa...@gmail.com > > > > > > >> > > > > > > > > > > > >> > > > > > > > wrote: > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> Hi, Don, > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> Thanks for the detailed design doc for > a > > > > > > >> long-waited > > > > > > >> > > > feature > > > > > > >> > > > > > in > > > > > > >> > > > > > > > > Samza! > > > > > > >> > > > > > > > > > > >> Really appreciate it! I did a quick > pass > > > and > > > > > have > > > > > > >> the > > > > > > >> > > > > > following > > > > > > >> > > > > > > > > > > comments: > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> - minor: "limit the maximum size of > > > > partition" > > > > > > ==> > > > > > > >> > > "limit > > > > > > >> > > > > the > > > > > > >> > > > > > > > > maximum > > > > > > >> > > > > > > > > > > size > > > > > > >> > > > > > > > > > > >> of each partition" > > > > > > >> > > > > > > > > > > >> - "However, Samza currently is not able > > to > > > > > handle > > > > > > >> > > > partition > > > > > > >> > > > > > > > > expansion > > > > > > >> > > > > > > > > > of > > > > > > >> > > > > > > > > > > >> the input streams"==>better point out > > "for > > > > > > stateful > > > > > > >> > > jobs". > > > > > > >> > > > > For > > > > > > >> > > > > > > > > > stateless > > > > > > >> > > > > > > > > > > >> jobs, simply bouncing the job now can > > pick > > > up > > > > > the > > > > > > >> new > > > > > > >> > > > > > > partitions. > > > > > > >> > > > > > > > > > > >> - "it is possible (e.g. with Kafka) > that > > > > > messages > > > > > > >> > with a > > > > > > >> > > > > given > > > > > > >> > > > > > > key > > > > > > >> > > > > > > > > > > exists > > > > > > >> > > > > > > > > > > >> in both partition 1 an 3. Because > > > > > > GroupByPartition > > > > > > >> > will > > > > > > >> > > > > assign > > > > > > >> > > > > > > > > > > partition 1 > > > > > > >> > > > > > > > > > > >> and 3 to different tasks, messages with > > the > > > > > same > > > > > > >> key > > > > > > >> > may > > > > > > >> > > > be > > > > > > >> > > > > > > > handled > > > > > > >> > > > > > > > > by > > > > > > >> > > > > > > > > > > >> different task/container/process and > > their > > > > > state > > > > > > >> will > > > > > > >> > be > > > > > > >> > > > > > stored > > > > > > >> > > > > > > in > > > > > > >> > > > > > > > > > > >> different changelog partition." The > > problem > > > > > > >> statement > > > > > > >> > is > > > > > > >> > > > not > > > > > > >> > > > > > > super > > > > > > >> > > > > > > > > > clear > > > > > > >> > > > > > > > > > > >> here. The issues with stateful jobs is: > > > after > > > > > > >> > > > > GroupByPartition > > > > > > >> > > > > > > > > assign > > > > > > >> > > > > > > > > > > >> partition 1 and 3 to different tasks, > the > > > new > > > > > > task > > > > > > >> > > > handling > > > > > > >> > > > > > > > > partition > > > > > > >> > > > > > > > > > 3 > > > > > > >> > > > > > > > > > > >> does not have the previous state to > > resume > > > > the > > > > > > >> work. > > > > > > >> > > e.g. > > > > > > >> > > > a > > > > > > >> > > > > > > > page-key > > > > > > >> > > > > > > > > > > based > > > > > > >> > > > > > > > > > > >> counter would start from 0 in the new > > task > > > > for > > > > > a > > > > > > >> > > specific > > > > > > >> > > > > key, > > > > > > >> > > > > > > > > instead > > > > > > >> > > > > > > > > > > of > > > > > > >> > > > > > > > > > > >> resuming the previous count 50 held by > > task > > > > 1. > > > > > > >> > > > > > > > > > > >> - minor rewording: "the first solution > in > > > > this > > > > > > doc" > > > > > > >> > ==> > > > > > > >> > > > "the > > > > > > >> > > > > > > > > solution > > > > > > >> > > > > > > > > > > >> proposed in this doc" > > > > > > >> > > > > > > > > > > >> - "Thus additional development work is > > > needed > > > > > in > > > > > > >> Kafka > > > > > > >> > > to > > > > > > >> > > > > meet > > > > > > >> > > > > > > > this > > > > > > >> > > > > > > > > > > >> requirement" It would be good to link > to > > a > > > > KIP > > > > > if > > > > > > >> and > > > > > > >> > > when > > > > > > >> > > > > it > > > > > > >> > > > > > > > exists > > > > > > >> > > > > > > > > > > >> - Instead of touching/deprecating the > > > > interface > > > > > > >> > > > > > > > > > > >> SystemStreamPartitionGrouper, I would > > > > recommend > > > > > > to > > > > > > >> > have > > > > > > >> > > a > > > > > > >> > > > > > > > different > > > > > > >> > > > > > > > > > > >> implementation class of the interface, > > > which > > > > in > > > > > > the > > > > > > >> > > > > > constructor > > > > > > >> > > > > > > of > > > > > > >> > > > > > > > > the > > > > > > >> > > > > > > > > > > >> grouper, takes two parameters: a) the > > > > previous > > > > > > task > > > > > > >> > > number > > > > > > >> > > > > > read > > > > > > >> > > > > > > > from > > > > > > >> > > > > > > > > > the > > > > > > >> > > > > > > > > > > >> coordinator stream; b) the configured > > > > > > >> new-partition to > > > > > > >> > > > > > > > old-partition > > > > > > >> > > > > > > > > > > >> mapping policy. Then, the grouper's > > > interface > > > > > > >> method > > > > > > >> > > stays > > > > > > >> > > > > the > > > > > > >> > > > > > > > same > > > > > > >> > > > > > > > > > and > > > > > > >> > > > > > > > > > > >> the > > > > > > >> > > > > > > > > > > >> behavior of the grouper is more > > > configurable > > > > > > which > > > > > > >> is > > > > > > >> > > good > > > > > > >> > > > > to > > > > > > >> > > > > > > > > support > > > > > > >> > > > > > > > > > a > > > > > > >> > > > > > > > > > > >> broader set of use cases in addition to > > > > Kafka's > > > > > > >> > built-in > > > > > > >> > > > > > > partition > > > > > > >> > > > > > > > > > > >> expansion policies. > > > > > > >> > > > > > > > > > > >> - Minor renaming suggestion to the new > > > > grouper > > > > > > >> class > > > > > > >> > > > names: > > > > > > >> > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum > > > > > > >> > > > > > > > > > > >> and GroupBySystemStreamPartitionWi > > > > > thFixedTaskNum > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> Thanks! > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> - Yi > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong > > Lin > > > < > > > > > > >> > > > > > lindon...@gmail.com > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > > >> > > > > > > > > > > >> > Hey Navina, > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > Thanks much for the comment. Please > see > > > my > > > > > > >> response > > > > > > >> > > > below. > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > Regarding your biggest gripe with the > > > SEP, > > > > I > > > > > > >> > > personally > > > > > > >> > > > > > think > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > > >> > operational requirement proposed in > the > > > KIP > > > > > are > > > > > > >> > pretty > > > > > > >> > > > > > general > > > > > > >> > > > > > > > and > > > > > > >> > > > > > > > > > > >> could be > > > > > > >> > > > > > > > > > > >> > easily enforced by other systems. The > > > > reason > > > > > is > > > > > > >> that > > > > > > >> > > the > > > > > > >> > > > > > > module > > > > > > >> > > > > > > > > > > >> operation > > > > > > >> > > > > > > > > > > >> > is pretty standard and the default > > option > > > > > when > > > > > > we > > > > > > >> > > choose > > > > > > >> > > > > > > > > partition. > > > > > > >> > > > > > > > > > > And > > > > > > >> > > > > > > > > > > >> > usually the underlying system allows > > user > > > > to > > > > > > >> select > > > > > > >> > > > > > arbitrary > > > > > > >> > > > > > > > > > > partition > > > > > > >> > > > > > > > > > > >> > number if it supports partition > > > expansion. > > > > Do > > > > > > you > > > > > > >> > know > > > > > > >> > > > any > > > > > > >> > > > > > > > system > > > > > > >> > > > > > > > > > that > > > > > > >> > > > > > > > > > > >> does > > > > > > >> > > > > > > > > > > >> > not meet these two requirement? > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > Regarding your comment of the > > Motivation > > > > > > >> section, I > > > > > > >> > > > > renamed > > > > > > >> > > > > > > the > > > > > > >> > > > > > > > > > first > > > > > > >> > > > > > > > > > > >> > section as "Problem and Goal" and > > > specified > > > > > > that > > > > > > >> > "*The > > > > > > >> > > > > goal > > > > > > >> > > > > > of > > > > > > >> > > > > > > > > this > > > > > > >> > > > > > > > > > > >> > proposal is to enable partition > > expansion > > > > of > > > > > > the > > > > > > >> > input > > > > > > >> > > > > > > > > streams*.". I > > > > > > >> > > > > > > > > > > >> also > > > > > > >> > > > > > > > > > > >> > put a sentence at the end of the > > > Motivation > > > > > > >> section > > > > > > >> > > that > > > > > > >> > > > > > "*The > > > > > > >> > > > > > > > > > feature > > > > > > >> > > > > > > > > > > >> of > > > > > > >> > > > > > > > > > > >> > task expansion is out of the scope of > > > this > > > > > > >> proposal > > > > > > >> > > and > > > > > > >> > > > > will > > > > > > >> > > > > > > be > > > > > > >> > > > > > > > > > > >> addressed > > > > > > >> > > > > > > > > > > >> > in a future SEP*". The second > paragraph > > > in > > > > > the > > > > > > >> > > > Motivation > > > > > > >> > > > > > > > section > > > > > > >> > > > > > > > > is > > > > > > >> > > > > > > > > > > >> mainly > > > > > > >> > > > > > > > > > > >> > used to explain the thinking process > > that > > > > we > > > > > > have > > > > > > >> > gone > > > > > > >> > > > > > > through, > > > > > > >> > > > > > > > > what > > > > > > >> > > > > > > > > > > >> other > > > > > > >> > > > > > > > > > > >> > alternative we have considered, and > we > > > plan > > > > > to > > > > > > >> do in > > > > > > >> > > > Samza > > > > > > >> > > > > > in > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > nex > > > > > > >> > > > > > > > > > > >> step. > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > To answer your question why > increasing > > > the > > > > > > >> partition > > > > > > >> > > > > number > > > > > > >> > > > > > > will > > > > > > >> > > > > > > > > > > >> increase > > > > > > >> > > > > > > > > > > >> > the throughput of the kafka consumer > in > > > the > > > > > > >> > container, > > > > > > >> > > > > Kafka > > > > > > >> > > > > > > > > > consumer > > > > > > >> > > > > > > > > > > >> can > > > > > > >> > > > > > > > > > > >> > potentially fetch more data in one > > > > > > FetchResponse > > > > > > >> > with > > > > > > >> > > > more > > > > > > >> > > > > > > > > > partitions > > > > > > >> > > > > > > > > > > in > > > > > > >> > > > > > > > > > > >> > the FetchRequest. This is because we > > > limit > > > > > the > > > > > > >> > maximum > > > > > > >> > > > > > amount > > > > > > >> > > > > > > of > > > > > > >> > > > > > > > > > data > > > > > > >> > > > > > > > > > > >> that > > > > > > >> > > > > > > > > > > >> > can be fetch for a given partition in > > the > > > > > > >> > > FetchResponse. > > > > > > >> > > > > > This > > > > > > >> > > > > > > by > > > > > > >> > > > > > > > > > > >> default is > > > > > > >> > > > > > > > > > > >> > set to 1 MB. And there is reason that > > we > > > > can > > > > > > not > > > > > > >> > > > > arbitrarily > > > > > > >> > > > > > > > bump > > > > > > >> > > > > > > > > up > > > > > > >> > > > > > > > > > > >> this > > > > > > >> > > > > > > > > > > >> > limit. > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > To answer your question how partition > > > > > expansion > > > > > > >> in > > > > > > >> > > Kafka > > > > > > >> > > > > > > impacts > > > > > > >> > > > > > > > > the > > > > > > >> > > > > > > > > > > >> > clients, Kafka consumer is able to > > > > > > automatically > > > > > > >> > > detect > > > > > > >> > > > > new > > > > > > >> > > > > > > > > > partition > > > > > > >> > > > > > > > > > > of > > > > > > >> > > > > > > > > > > >> > the topic and reassign all (both old > > and > > > > new) > > > > > > >> > > partitions > > > > > > >> > > > > > > across > > > > > > >> > > > > > > > > > > >> consumers > > > > > > >> > > > > > > > > > > >> > in the consumer group IF you tell > > > consumer > > > > > the > > > > > > >> topic > > > > > > >> > > to > > > > > > >> > > > be > > > > > > >> > > > > > > > > > subscribed. > > > > > > >> > > > > > > > > > > >> But > > > > > > >> > > > > > > > > > > >> > consumer in Samza's container uses > > > another > > > > > way > > > > > > of > > > > > > >> > > > > > > subscription. > > > > > > >> > > > > > > > > > > Instead > > > > > > >> > > > > > > > > > > >> of > > > > > > >> > > > > > > > > > > >> > subscribing to the topic, the > consumer > > in > > > > > > Samza's > > > > > > >> > > > > container > > > > > > >> > > > > > > > > > subscribes > > > > > > >> > > > > > > > > > > >> to > > > > > > >> > > > > > > > > > > >> > the specific partitions of the topic. > > In > > > > this > > > > > > >> case, > > > > > > >> > if > > > > > > >> > > > new > > > > > > >> > > > > > > > > > partitions > > > > > > >> > > > > > > > > > > >> have > > > > > > >> > > > > > > > > > > >> > been added, Samza will need to > > explicitly > > > > > > >> subscribe > > > > > > >> > to > > > > > > >> > > > the > > > > > > >> > > > > > new > > > > > > >> > > > > > > > > > > >> partitions > > > > > > >> > > > > > > > > > > >> > of the topic. The "Handle partition > > > > expansion > > > > > > >> while > > > > > > >> > > > tasks > > > > > > >> > > > > > are > > > > > > >> > > > > > > > > > running" > > > > > > >> > > > > > > > > > > >> > section in the SEP addresses this > issue > > > in > > > > > > Samza > > > > > > >> -- > > > > > > >> > it > > > > > > >> > > > > > > > > recalculates > > > > > > >> > > > > > > > > > > the > > > > > > >> > > > > > > > > > > >> job > > > > > > >> > > > > > > > > > > >> > model and restart container so that > > > > consumer > > > > > > can > > > > > > >> > > > subscribe > > > > > > >> > > > > > to > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > new > > > > > > >> > > > > > > > > > > >> > partitions. > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > I will ask other dev to take a look > at > > > the > > > > > > >> > proposal. I > > > > > > >> > > > > will > > > > > > >> > > > > > > > start > > > > > > >> > > > > > > > > > the > > > > > > >> > > > > > > > > > > >> > voting thread tomorrow if there is no > > > > further > > > > > > >> > concern > > > > > > >> > > > with > > > > > > >> > > > > > the > > > > > > >> > > > > > > > > SEP. > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > Thanks! > > > > > > >> > > > > > > > > > > >> > Dong > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, > > Navina > > > > > Ramesh > > > > > > >> > > > (Apache) < > > > > > > >> > > > > > > > > > > >> > nav...@apache.org> > > > > > > >> > > > > > > > > > > >> > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > > > >> > > > > > > > > > > >> > > Hey Dong, > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > I have updated the motivation > > > section > > > > to > > > > > > >> > clarify > > > > > > >> > > > > this. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Thanks for updating the motivation. > > > > Couple > > > > > of > > > > > > >> > notes > > > > > > >> > > > > here: > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > 1. > > > > > > >> > > > > > > > > > > >> > > > "The motivation of increasing > > > partition > > > > > > >> number > > > > > > >> > of > > > > > > >> > > > > Kafka > > > > > > >> > > > > > > > topic > > > > > > >> > > > > > > > > > > >> includes > > > > > > >> > > > > > > > > > > >> > 1) > > > > > > >> > > > > > > > > > > >> > > limit the maximum size of a > partition > > > in > > > > > > order > > > > > > >> to > > > > > > >> > > > > improve > > > > > > >> > > > > > > > broker > > > > > > >> > > > > > > > > > > >> > > performance and 2) increase > > throughput > > > of > > > > > > Kafka > > > > > > >> > > > consumer > > > > > > >> > > > > > in > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > > Samza > > > > > > >> > > > > > > > > > > >> > > container." > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > It's unclear to me how increasing > the > > > > > > partition > > > > > > >> > > number > > > > > > >> > > > > > will > > > > > > >> > > > > > > > > > increase > > > > > > >> > > > > > > > > > > >> the > > > > > > >> > > > > > > > > > > >> > > throughput of the kafka consumer in > > the > > > > > > >> container? > > > > > > >> > > > > > > > > Theoretically, > > > > > > >> > > > > > > > > > > you > > > > > > >> > > > > > > > > > > >> > will > > > > > > >> > > > > > > > > > > >> > > still be consuming the same amount > of > > > > data > > > > > in > > > > > > >> the > > > > > > >> > > > > > container, > > > > > > >> > > > > > > > > > > >> irrespective > > > > > > >> > > > > > > > > > > >> > > of whether it is coming from one > > > > partition > > > > > or > > > > > > >> more > > > > > > >> > > > than > > > > > > >> > > > > > one > > > > > > >> > > > > > > > > > expanded > > > > > > >> > > > > > > > > > > >> > > partitions. Can you please explain > it > > > for > > > > > me > > > > > > >> here, > > > > > > >> > > > what > > > > > > >> > > > > > you > > > > > > >> > > > > > > > mean > > > > > > >> > > > > > > > > > by > > > > > > >> > > > > > > > > > > >> that? > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > 2. I believe the second paragraph > > under > > > > > > >> motivation > > > > > > >> > > is > > > > > > >> > > > > > simply > > > > > > >> > > > > > > > > > talking > > > > > > >> > > > > > > > > > > >> > about > > > > > > >> > > > > > > > > > > >> > > the scope of the current SEP. It > will > > > be > > > > > > >> easier to > > > > > > >> > > > read > > > > > > >> > > > > if > > > > > > >> > > > > > > > what > > > > > > >> > > > > > > > > > > >> solution > > > > > > >> > > > > > > > > > > >> > is > > > > > > >> > > > > > > > > > > >> > > included in this SEP and what is > left > > > out > > > > > as > > > > > > >> not > > > > > > >> > in > > > > > > >> > > > > scope. > > > > > > >> > > > > > > > (for > > > > > > >> > > > > > > > > > > >> example, > > > > > > >> > > > > > > > > > > >> > > expansions for stateful jobs is > > > supported > > > > > or > > > > > > >> not). > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > We need to persist the > > > task-to-sspList > > > > > > >> mapping > > > > > > >> > in > > > > > > >> > > > the > > > > > > >> > > > > > > > > > > >> > > coordinator stream so that the job > > can > > > > > derive > > > > > > >> the > > > > > > >> > > > > original > > > > > > >> > > > > > > > > number > > > > > > >> > > > > > > > > > of > > > > > > >> > > > > > > > > > > >> > > partitions of each input stream > > > > regardless > > > > > of > > > > > > >> how > > > > > > >> > > many > > > > > > >> > > > > > times > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > > >> > partition > > > > > > >> > > > > > > > > > > >> > > has expanded. Does this make sense? > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Yes. It does! > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > I am not sure how this is related > > to > > > > the > > > > > > >> > locality > > > > > > >> > > > > > though. > > > > > > >> > > > > > > > Can > > > > > > >> > > > > > > > > > you > > > > > > >> > > > > > > > > > > >> > clarify > > > > > > >> > > > > > > > > > > >> > > your question if I haven't answered > > > your > > > > > > >> question? > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > It's not related. I just meant to > > give > > > an > > > > > > >> example > > > > > > >> > of > > > > > > >> > > > yet > > > > > > >> > > > > > > > another > > > > > > >> > > > > > > > > > > >> > > coordinator message that is > > persisted. > > > > Your > > > > > > >> > > > ssp-to-task > > > > > > >> > > > > > > > mapping > > > > > > >> > > > > > > > > is > > > > > > >> > > > > > > > > > > >> > > following a similar pattern for > > > > persisting. > > > > > > >> Just > > > > > > >> > > > wanted > > > > > > >> > > > > to > > > > > > >> > > > > > > > > clarify > > > > > > >> > > > > > > > > > > >> that. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > Can you let me know if this, > > together > > > > > with > > > > > > >> the > > > > > > >> > > > answers > > > > > > >> > > > > > in > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > > >> previous > > > > > > >> > > > > > > > > > > >> > > email, addresses all your > questions? > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Yes. I believe you have addressed > > most > > > of > > > > > my > > > > > > >> > > > questions. > > > > > > >> > > > > > > Thanks > > > > > > >> > > > > > > > > for > > > > > > >> > > > > > > > > > > >> taking > > > > > > >> > > > > > > > > > > >> > > time to do that. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > Is there specific question you > have > > > > > > regarding > > > > > > >> > > > > partition > > > > > > >> > > > > > > > > > > >> > > expansion in Kafka? > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > I guess my questions are on how > > > partition > > > > > > >> > expansion > > > > > > >> > > in > > > > > > >> > > > > > Kafka > > > > > > >> > > > > > > > > > impacts > > > > > > >> > > > > > > > > > > >> the > > > > > > >> > > > > > > > > > > >> > > clients. Iiuc, partition expansions > > are > > > > > done > > > > > > >> > > manually > > > > > > >> > > > in > > > > > > >> > > > > > > Kafka > > > > > > >> > > > > > > > > > based > > > > > > >> > > > > > > > > > > >> on > > > > > > >> > > > > > > > > > > >> > the > > > > > > >> > > > > > > > > > > >> > > bytes-in rate of the partition. Do > > the > > > > > > existing > > > > > > >> > > kafka > > > > > > >> > > > > > > clients > > > > > > >> > > > > > > > > > handle > > > > > > >> > > > > > > > > > > >> this > > > > > > >> > > > > > > > > > > >> > > expansion automatically? if yes, > how > > > does > > > > > it > > > > > > >> work? > > > > > > >> > > If > > > > > > >> > > > > not, > > > > > > >> > > > > > > are > > > > > > >> > > > > > > > > > there > > > > > > >> > > > > > > > > > > >> > plans > > > > > > >> > > > > > > > > > > >> > > to support it in the future? > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > Thus user's job should not need > to > > > > > > bootstrap > > > > > > >> > > > key/value > > > > > > >> > > > > > > store > > > > > > >> > > > > > > > > > from > > > > > > >> > > > > > > > > > > >> the > > > > > > >> > > > > > > > > > > >> > > changelog topic. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Why is this discussion relevant > here? > > > > > > Key/value > > > > > > >> > > store > > > > > > >> > > > / > > > > > > >> > > > > > > > > changelog > > > > > > >> > > > > > > > > > > >> topic > > > > > > >> > > > > > > > > > > >> > > partition is scoped with the > context > > > of a > > > > > > task. > > > > > > >> > > Since > > > > > > >> > > > we > > > > > > >> > > > > > are > > > > > > >> > > > > > > > not > > > > > > >> > > > > > > > > > > >> changing > > > > > > >> > > > > > > > > > > >> > > the number of tasks, I don't think > it > > > is > > > > > > >> required > > > > > > >> > to > > > > > > >> > > > > > mention > > > > > > >> > > > > > > > it > > > > > > >> > > > > > > > > > > here. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > The new method takes the > > > > > > >> > > > SystemStreamPartition-to-Task > > > > > > >> > > > > > > > > > assignment > > > > > > >> > > > > > > > > > > >> from > > > > > > >> > > > > > > > > > > >> > > the previous job model which can be > > > read > > > > > from > > > > > > >> the > > > > > > >> > > > > > > coordinator > > > > > > >> > > > > > > > > > > stream. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Jobmodel is currently not persisted > > to > > > > > > >> coordinator > > > > > > >> > > > > stream. > > > > > > >> > > > > > > In > > > > > > >> > > > > > > > > your > > > > > > >> > > > > > > > > > > >> > design, > > > > > > >> > > > > > > > > > > >> > > you talk about writing separate > > > > coordinator > > > > > > >> > messages > > > > > > >> > > > for > > > > > > >> > > > > > > > > > ssp-to-task > > > > > > >> > > > > > > > > > > >> > > assignments. Hence, please correct > > this > > > > > > >> statement. > > > > > > >> > > It > > > > > > >> > > > is > > > > > > >> > > > > > > kind > > > > > > >> > > > > > > > of > > > > > > >> > > > > > > > > > > >> > misleading > > > > > > >> > > > > > > > > > > >> > > to the reader. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > My biggest gripe with this SEP is > > that > > > it > > > > > > seems > > > > > > >> > > like a > > > > > > >> > > > > > > > > tailor-made > > > > > > >> > > > > > > > > > > >> > solution > > > > > > >> > > > > > > > > > > >> > > that relies on the semantics of the > > > Kafka > > > > > > >> system > > > > > > >> > and > > > > > > >> > > > > yet, > > > > > > >> > > > > > we > > > > > > >> > > > > > > > are > > > > > > >> > > > > > > > > > > >> trying > > > > > > >> > > > > > > > > > > >> > to > > > > > > >> > > > > > > > > > > >> > > masquerade that as operational > > > > requirements > > > > > > for > > > > > > >> > > other > > > > > > >> > > > > > > systems > > > > > > >> > > > > > > > > > > >> interacting > > > > > > >> > > > > > > > > > > >> > > with Samza. (Not to say that this > is > > > the > > > > > > first > > > > > > >> > time > > > > > > >> > > > > such a > > > > > > >> > > > > > > > > choice > > > > > > >> > > > > > > > > > is > > > > > > >> > > > > > > > > > > >> > being > > > > > > >> > > > > > > > > > > >> > > made in the Samza design). I am not > > > > seeing > > > > > > how > > > > > > >> > this > > > > > > >> > > > can > > > > > > >> > > > > a > > > > > > >> > > > > > > > > > "general" > > > > > > >> > > > > > > > > > > >> > > solution for all input systems. > > That's > > > my > > > > > two > > > > > > >> > > cents. I > > > > > > >> > > > > > would > > > > > > >> > > > > > > > > like > > > > > > >> > > > > > > > > > to > > > > > > >> > > > > > > > > > > >> hear > > > > > > >> > > > > > > > > > > >> > > alternative points of view for this > > > from > > > > > > other > > > > > > >> > devs. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Please make sure you have enough > eyes > > > on > > > > > this > > > > > > >> SEP. > > > > > > >> > > If > > > > > > >> > > > > you > > > > > > >> > > > > > > do, > > > > > > >> > > > > > > > > > please > > > > > > >> > > > > > > > > > > >> > start > > > > > > >> > > > > > > > > > > >> > > a VOTE thread to approve this SEP. > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > Thanks! > > > > > > >> > > > > > > > > > > >> > > Navina > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, > > Dong > > > > Lin > > > > > < > > > > > > >> > > > > > > > lindon...@gmail.com > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > >> wrote: > > > > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > > > > > >> > > > Hey Navina, > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > I have updated the wiki based on > > your > > > > > > >> > suggestion. > > > > > > >> > > > More > > > > > > >> > > > > > > > > > > >> specifically, I > > > > > > >> > > > > > > > > > > >> > > have > > > > > > >> > > > > > > > > > > >> > > > made the following changes: > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > - Improved Problem section and > > > > Motivation > > > > > > >> > section > > > > > > >> > > to > > > > > > >> > > > > > > > describe > > > > > > >> > > > > > > > > > why > > > > > > >> > > > > > > > > > > we > > > > > > >> > > > > > > > > > > >> > use > > > > > > >> > > > > > > > > > > >> > > > the solution in this proposal > > instead > > > > of > > > > > > >> > tackling > > > > > > >> > > > the > > > > > > >> > > > > > > > problem > > > > > > >> > > > > > > > > of > > > > > > >> > > > > > > > > > > >> task > > > > > > >> > > > > > > > > > > >> > > > expansion directly. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > - Illustrate the design in a way > > that > > > > > > doesn't > > > > > > >> > bind > > > > > > >> > > > to > > > > > > >> > > > > > > Kafka. > > > > > > >> > > > > > > > > > Kafka > > > > > > >> > > > > > > > > > > >> is > > > > > > >> > > > > > > > > > > >> > > only > > > > > > >> > > > > > > > > > > >> > > > used as example to illustrate why > > we > > > > want > > > > > > to > > > > > > >> > > expand > > > > > > >> > > > > > > > partition > > > > > > >> > > > > > > > > > > >> expansion > > > > > > >> > > > > > > > > > > >> > > and > > > > > > >> > > > > > > > > > > >> > > > whether the operational > requirement > > > can > > > > > be > > > > > > >> > > supported > > > > > > >> > > > > > when > > > > > > >> > > > > > > > > Kafka > > > > > > >> > > > > > > > > > is > > > > > > >> > > > > > > > > > > >> used > > > > > > >> > > > > > > > > > > >> > > as > > > > > > >> > > > > > > > > > > >> > > > the input system. Note that the > > > > proposed > > > > > > >> > solution > > > > > > >> > > > > should > > > > > > >> > > > > > > > work > > > > > > >> > > > > > > > > > for > > > > > > >> > > > > > > > > > > >> any > > > > > > >> > > > > > > > > > > >> > > input > > > > > > >> > > > > > > > > > > >> > > > system that meets the operational > > > > > > requirement > > > > > > >> > > > > described > > > > > > >> > > > > > in > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > > wiki. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > - Fixed the problem in the > figure. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > - Added a new class > > > > > > >> > GroupBySystemStreamPartitionFi > > > > > > >> > > > > > > > xedTaskNum > > > > > > >> > > > > > > > > to > > > > > > >> > > > > > > > > > > the > > > > > > >> > > > > > > > > > > >> > > wiki. > > > > > > >> > > > > > > > > > > >> > > > Together with > > > > > GroupByPartitionFixedTaskNum, > > > > > > >> it > > > > > > >> > > > should > > > > > > >> > > > > > > ensure > > > > > > >> > > > > > > > > > that > > > > > > >> > > > > > > > > > > we > > > > > > >> > > > > > > > > > > >> > > have a > > > > > > >> > > > > > > > > > > >> > > > solution to enable partition > > > expansion > > > > > for > > > > > > >> all > > > > > > >> > > users > > > > > > >> > > > > > that > > > > > > >> > > > > > > > are > > > > > > >> > > > > > > > > > > using > > > > > > >> > > > > > > > > > > >> > > > pre-defined grouper in Samza. > Note > > > that > > > > > > those > > > > > > >> > > users > > > > > > >> > > > > who > > > > > > >> > > > > > > use > > > > > > >> > > > > > > > > > custom > > > > > > >> > > > > > > > > > > >> > > grouper > > > > > > >> > > > > > > > > > > >> > > > would need to update their > > > > > implementation. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > Can you let me know if this, > > together > > > > > with > > > > > > >> the > > > > > > >> > > > answers > > > > > > >> > > > > > in > > > > > > >> > > > > > > > the > > > > > > >> > > > > > > > > > > >> previous > > > > > > >> > > > > > > > > > > >> > > > email, addresses all your > > questions? > > > > > Thanks > > > > > > >> for > > > > > > >> > > > taking > > > > > > >> > > > > > > time > > > > > > >> > > > > > > > to > > > > > > >> > > > > > > > > > > >> review > > > > > > >> > > > > > > > > > > >> > the > > > > > > >> > > > > > > > > > > >> > > > proposal. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > Regards, > > > > > > >> > > > > > > > > > > >> > > > Dong > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, > > > Dong > > > > > Lin > > > > > > < > > > > > > >> > > > > > > > > lindon...@gmail.com > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > >> > wrote: > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > > >> > > > > Hey Navina, > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > Thanks much for your comments. > > > Please > > > > > see > > > > > > >> my > > > > > > >> > > reply > > > > > > >> > > > > > > inline. > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 > AM, > > > > > Navina > > > > > > >> > Ramesh > > > > > > >> > > > > > > (Apache) < > > > > > > >> > > > > > > > > > > >> > > > > nav...@apache.org> wrote: > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I > > have a > > > > > > couple > > > > > > >> of > > > > > > >> > > > > > questions > > > > > > >> > > > > > > to > > > > > > >> > > > > > > > > > > >> understand > > > > > > >> > > > > > > > > > > >> > > > your > > > > > > >> > > > > > > > > > > >> > > > >> proposal better: > > > > > > >> > > > > > > > > > > >> > > > >> > > > > > > >> > > > > > > > > > > >> > > > >> * Under motivation, you > mention > > > that > > > > > > "_We > > > > > > >> > > expect > > > > > > >> > > > > this > > > > > > >> > > > > > > > > > solution > > > > > > >> > > > > > > > > > > to > > > > > > >> > > > > > > > > > > > > ... > > > > [Message clipped] >