Great discussion ! Here are some more thoughts
The point that repartitioning is a more general purpose solution is surely spot on. For many source systems (Kinesis, Google Pub-Sub, any of the older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways functionally required to do even simple keyed aggregations. But in most of these systems, the concept of repartitioning either does not exist or exists in a way which is very unique (e.g. Kinesis). I think this feature is really only interesting for source systems like Kafka and EventHub. EventHub (last I checked) didn't support repartitioning. So this is probably not super-interesting (yet) for EventHub. So Kafka is clearly the main use case here. For Kafka, I think it is pretty rare for people to customize the hashing algorithm for sending messages. I would argue that less than 5% of the population (i am being generous ;)) would do that. The current proposal works with the default hashing scheme for Kafka. So organizations will typically never have to coordinate. If the proposed alternative (always repartition) was side-effect free, then it would make sense to use an alternative design that would work for 100% of the population. Repartitioning all input would however not be a feasible solution (atleast at LinkedIn) as it would double the kafka workload. If many samza jobs read from kafka topics, then the increase would be a function of the number of samza jobs. For low throughput kafka topics, surely explicit repartitioning using fluent api is feasible. If the proposal was to make this new policy the default then that would clearly not make much sense. But it is an opt in policy. If it is not applicable, people don't have to use it. I do have some questions about the implementation. I will try to respond back after spending some more time on this. On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes <jacob.m...@gmail.com> wrote: > Thanks, Dong. > > The summary looks accurate. > > I'll let the others chime in, as I believe my perspective has been > adequately captured in this thread. > > -Jake > > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Jacob, > > > > Thank you for taking so much time to discuss with me! I appreciate the > > discussion and the insight. I will summarize our discussion below. > > > > 1) Whether it is reasonable to store partition-to-task mapping. > > > > We agree that this partition-to-task mapping will be reasonable if we > allow > > user to specify either the new-partition-to-old-partition mapping or > > key-to-partition mapping in the future. SEP-5 doesn't currently provide a > > way for user to specify new-partition-to-old partition mapping because we > > don't have a good idea about that interface until we try to enable > > partition expansion for input system other than Kafka in the future. This > > is currently specified as the third future work in SEP-5. > > > > And if we decide to implement SEP-5, I will include a warning message > > regarding the use of partition-to-task, i.e. "this does not specify the > > key-to-task mapping". We agree that this could address the concern here. > > > > 2) Whether we should follow the approach in SEP-5 or use an extra > > re-partitioning stage in the stateful Samza job to enable partition > > expansion. > > > > Here are the pros and cons of the extra re-partitioning stage in > comparison > > to SEP-5. > > > > Pros: > > - It doesn't require owner of the Samza job to know the partitioning > > algorithm of used for the input stream. If the owner of the Samza job is > in > > a different organization than the producer of the input stream, this > > solution frees different organizations from having to coordinate with > each > > other. > > - It doesn't require owner of the Samza job to specify the partitioning > > algorithm of used for the input stream. Thus less config. > > > > Cons: > > - User has to make code change on their side to use the new fluent API. > > - The extra partitioning stage would potentially increases latency. > > - The extra partitioning stage would incur additional cost due to the > extra > > internal topic. The cost is probably not that much with the new trim() > API > > in Kafka if Samza uses Kafka to store the internal topic. But the cost > may > > be doubled if Samza uses another input system that doesn't provide trim() > > API to delete data on demand. > > > > My recommendation is to adopt a hybrid solution, i.e. we still implement > > the current proposal in SEP-5 so that we enable partition expansion > without > > incurring extra latency/cost and without requiring users to change their > > code. And we can recommend user to use the extra partitioning stage if > the > > coordination among different organization is indeed a concern. > > > > Can other developers also provide feedback regarding your preference > > between the two? > > > > Thanks, > > Dong > > > > > > > > > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <jacob.m...@gmail.com> > wrote: > > > > > Hey Dong, > > > > > > I appreciate your thoughtful responses. Let's do one more round :-) > > > > > > > > > > Here are my current concern with the three alternatives you described > > > > earlier: > > > > - The first alternative requires support from input system which is > > > > currently not available. It will limit the usage of partition > expansion > > > to > > > > only systems that support such interface. And it is not guaranteed > that > > > we > > > > can persuade the developer of the input system to add this interface. > > > This > > > > is not desirable for Samza in the long term. > > > > > > Agreed. It is very wishful thinking that each supported system would > > > provide such a contract. > > > > > > > > > > > > > > - I can not comment on the second alternative because I don't > > understand > > > > how it reshuffles all existing changelog data. We can discuss more if > > > there > > > > is more specific detail. My gut feel is that this will be complex and > > > > carries performance overhead. > > > > > > After giving this more thought, I agree. There is no clear way to > > migrate a > > > changelog without knowing the original key->partition mapping. Which > > leads > > > us to alternative 3... > > > > > > > > > > > > > > - The third alternative requires performance overhead. Given that > user > > > can > > > > already use this solution to enable partition expansion, maybe Samza > > > > developers can provide more input as to why we are not doing it by > > > default. > > > > My gut feel is that it carries considerable performance overhead and > > > > increases the cost-to-serve Samze job (e.g. disk usage), which may > make > > > it > > > > undesirable in the long term. > > > > > > I think the only performance overhead would be the mandatory > > repartitioning > > > stage for stateful jobs. But a repartitioner is usually much faster > than > > > the downstream stateful job, so it only seems a cost to serve issue. > > > > > > As for why we aren't already doing this, I would posit that before the > > > introduction of the high level API, which trivializes repartitioning, > it > > > was unreasonable to expect each job owner to do the mandatory > > > repartitioning. With the high level API, I would argue this is much > more > > > doable. > > > > > > > > > I am not sure it is true that "any future feature that utilizes this > > > > mapping without accounting for the assumptions of this SEP is likely > to > > > > malfunction". Suppose we allow user to specify new-to-old-partition > > > > mapping, then we can use the partition-to-task mapping correctly > > without > > > > replying on the assumption in this SEP, right? > > > > > > Right, but my point was that the partition->task mapping is not > > sufficient > > > by itself. So adding it by itself is potentially misleading. > > > > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > > > Thanks for the reply Jacob. Please see my comment inline. > > > > > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com> > > > wrote: > > > > > > > > > > > > > > > > - For users that need partition expansion of the input streams > for > > > > > stateful > > > > > > job, they have a really big headache in the sense that Samza does > > not > > > > > allow > > > > > > partition expansion for stateful job. SEP-5 addresses this > headache > > > for > > > > > > them. > > > > > > You are right that SEP-5 requires user to understand and enforce > > > > > > limitations across organizations. But it is still much better > than > > > not > > > > > > allowing user to expansion partition for stateful jobs at all, > > right? > > > > > Did I > > > > > > miss something here? > > > > > > > > > > I guess this one is a matter of perspective. > > > > > > > > > > One argument is that if the system supports one case, it's better > > than > > > > none > > > > > because there is one less scenario in which the system does the > wrong > > > > > thing. > > > > > > > > > > The counter argument is for uniform and consistent behavior, which > is > > > > easy > > > > > for users to understand and properly leverage. > > > > > > > > > > Specifically, I'd argue that the current rule is very simple: "you > > > cannot > > > > > repartition inputs on a stateful job, so you must over-partition > the > > > > > initial implementation". To me, while that rule is not ideal, its > > > > > simplicity is better that introducing a new solution that has a > bunch > > > of > > > > > caveats, any one of which could be missed. If any one of the > > > assumptions > > > > in > > > > > this SEP design are violated, the job would behave incorrectly. > That > > > > puts a > > > > > lot more burden on the users than the simpler rule. > > > > > > > > > > > > > I agree that we have different perspective here. It is true that user > > > would > > > > mess up their job if they used this feature in a wrong way, i.e. > > violate > > > > the assumption made in SEP-5. On the other hand, I think there is > > always > > > a > > > > way for user to mess up their job if they configure the Samza job > > > > incorrectly. I also think the assumption made in this SEP is not > > > > particularly harder to understand than other existing configs in > Samza. > > > > > > > > The answer to this can be subjective. I would love to hear > perspective > > > from > > > > other developers on this issue. > > > > > > > > > > > > > > > > > > That's why I mentioned a few alternatives that, while more complex > to > > > > > implement, would provide a more consistent behavior with simple > rules > > > for > > > > > the users. > > > > > > > > > > > > > I am open to discuss alternative solutions that can address the the > > > problem > > > > in a better manner. I am not opposed to complexity as long as it > gives > > us > > > > good long term benefits. > > > > > > > > Here are my current concern with the three alternatives you described > > > > earlier: > > > > > > > > - The first alternative requires support from input system which is > > > > currently not available. It will limit the usage of partition > expansion > > > to > > > > only systems that support such interface. And it is not guaranteed > that > > > we > > > > can persuade the developer of the input system to add this interface. > > > This > > > > is not desirable for Samza in the long term. > > > > > > > > - I can not comment on the second alternative because I don't > > understand > > > > how it reshuffles all existing changelog data. We can discuss more if > > > there > > > > is more specific detail. My gut feel is that this will be complex and > > > > carries performance overhead. > > > > > > > > - The third alternative requires performance overhead. Given that > user > > > can > > > > already use this solution to enable partition expansion, maybe Samza > > > > developers can provide more input as to why we are not doing it by > > > default. > > > > My gut feel is that it carries considerable performance overhead and > > > > increases the cost-to-serve Samze job (e.g. disk usage), which may > make > > > it > > > > undesirable in the long term. > > > > > > > > > > > > > > > > > > > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi > > > > > > thFixedTaskNum > > > > > > as well. If there is more grouper classes needed in the future, > we > > > can > > > > > > solve this problem cleanly without new config. Given the > > > > > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey > > will > > > > > throw > > > > > > exception if and only if newGrouperClass is an instance of > > > > > > previousGrouperClass. > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend > > > > > > GroupBySystemStreamPartition > > > > > > and GroupByPartitionWithFixedTaskNum should extend > > GroupByPartition. > > > > > Does > > > > > > this address your concern? > > > > > > > > > > Sounds workable, thanks. > > > > > > > > > > > > > > > > > Can > > > > > > you be more specific why Partition-to-task mapping is not > > meaningful > > > > > > without > > > > > > some definition of the key-to-partition assignments and why it is > > > > > > incomplete and misleading? > > > > > > > > > > A partition is (in my naive interpretation) an independent queue > for > > > > > messages of a particular key set. It is not the *identity* of the > > > > partition > > > > > that determine the contents of the associated task's local state. > > > Rather > > > > it > > > > > is the *contents* of the partition that affect the task's state. A > > > > > partiton-to-task mapping only captures an identity relationship: > > > > > partition1->task1. Without the assumptions of this SEP, this is > > > > > insufficient to determine the assignment of keys to tasks, which is > > > what > > > > > really matters. Therefore, any future feature that utilizes this > > > mapping > > > > > without accounting for the assumptions of this SEP is likely to > > > > > malfunction. > > > > > > > > > > > > > > I am not sure it is true that "any future feature that utilizes this > > > > mapping without accounting for the assumptions of this SEP is likely > to > > > > malfunction". Suppose we allow user to specify new-to-old-partition > > > > mapping, then we can use the partition-to-task mapping correctly > > without > > > > replying on the assumption in this SEP, right? > > > > > > > > > > > > > > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > > > Hey Jacob, > > > > > > > > > > > > Thanks for the explanation. It seems that your biggest concern is > > > with > > > > > the > > > > > > generality of the proposal. Let me try to address this and other > > > > comments > > > > > > below. > > > > > > > > > > > > 1) ... it will cause headaches for Samza users ... > > > > > > > > > > > > I am not sure I understand why this proposal causes headache for > > > Samza > > > > > > users. Here is the impact of the SEP-5 on users: > > > > > > > > > > > > - For users that do not need partition expansion of the input > > stream, > > > > > they > > > > > > can use Samza without change change in code/binary/config. Thus > > there > > > > is > > > > > no > > > > > > headache for them. > > > > > > > > > > > > - For users that need partition expansion of the input streams > for > > > > > > stateless job, they currently need to manually reboot their Samza > > job > > > > in > > > > > > order to let Samza consume the new partitions created for the > > stream. > > > > > SEP-5 > > > > > > actually reduced their headache by allowing Samza to > automatically > > > > detect > > > > > > and consume new partitions. > > > > > > > > > > > > - For users that need partition expansion of the input streams > for > > > > > stateful > > > > > > job, they have a really big headache in the sense that Samza does > > not > > > > > allow > > > > > > partition expansion for stateful job. SEP-5 addresses this > headache > > > for > > > > > > them. > > > > > > > > > > > > You are right that SEP-5 requires user to understand and enforce > > > > > > limitations across organizations. But it is still much better > than > > > not > > > > > > allowing user to expansion partition for stateful jobs at all, > > right? > > > > > Did I > > > > > > miss something here? > > > > > > > > > > > > 2) ... Separate orgs are often difficult to coordinate and a > system > > > > which > > > > > > depends on such significant process/coordination is too fragile > for > > > my > > > > > > taste .. > > > > > > > > > > > > This is true. Ideally we want a system that is fully > self-serving. > > I > > > > > think > > > > > > this is a long term goal for Samza. Still, for the reasons > > described > > > > > above, > > > > > > I think something is better than nothing. I am open to > alternative > > > > design > > > > > > that can support partition expansion for stateful jobs without > > > > requiring > > > > > > coordination. > > > > > > > > > > > > 3) There is currently no supported way of sharing state among the > > > tasks > > > > > of > > > > > > a container. Each task has its own isolated store and that > logical > > > > > > isolation is the primary thing that enables Samza jobs to scale > > with > > > a > > > > > > simple container count change. My feeling is that we should not > > > change > > > > > > this without > > > > > > good reason. > > > > > > > > > > > > I see your point. I will remove this sentence from the motivation > > > > > section. > > > > > > This won't have any impact on the design of the SEP-5. Does this > > > > address > > > > > > the problem? > > > > > > > > > > > > 4) With the current proposal, we'd also need a similar check for > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any > > > other > > > > > > groupers > > > > > > were later added with both these modes, we'd probably need to add > > > those > > > > > > too. It might be easier and cleaner to add a config to ignore > that > > > > check > > > > > > temporarily. Down side is that it further complicates the Samza > > > config, > > > > > > which is already huge. Thoughts? > > > > > > > > > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi > > > > > > thFixedTaskNum > > > > > > as well. If there is more grouper classes needed in the future, > we > > > can > > > > > > solve this problem cleanly without new config. Given the > > > > > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey > > will > > > > > throw > > > > > > exception if and only if newGrouperClass is an instance of > > > > > > previousGrouperClass. > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend > > > > > > GroupBySystemStreamPartition > > > > > > and GroupByPartitionWithFixedTaskNum should extend > > GroupByPartition. > > > > > Does > > > > > > this address your concern? > > > > > > > > > > > > 5) The task-to-container and container-to-host mappings are both > > > > > meaningful > > > > > > in context of the JobModel. Partition-to-task mapping is not > > > meaningful > > > > > > without > > > > > > some definition of the key-to-partition assignments. It's > > incomplete > > > > > > information and therefore misleading. I think it only makes sense > > to > > > > use > > > > > > this mapping if we adopt a solution wherein Samza also knows the > > > > > partition > > > > > > key assignment. > > > > > > > > > > > > Partition-to-task is currently explicitly passed from job > > coordinator > > > > to > > > > > > each task as part of the job model to tell tasks which partitions > > to > > > > > > consume from. I think we can store some definition of the > > > > > key-to-partition > > > > > > assignments if Samza decides to get and use this information in > the > > > > > > future. Can > > > > > > you be more specific why Partition-to-task mapping is not > > meaningful > > > > > > without > > > > > > some definition of the key-to-partition assignments and why it is > > > > > > incomplete and misleading? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > Dong > > > > > > > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes < > jacob.m...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > Hey Dong, > > > > > > > > > > > > > > I'm opposed (or a +0, at best) to this limited, Kafka-specific > > > > > solution. > > > > > > I > > > > > > > understand that the proposal is relatively simple to implement, > > > but I > > > > > > think > > > > > > > it will cause headaches for Samza users. They will not only > have > > to > > > > > > > understand all the limitations (increase only, double > partitions > > > > only, > > > > > > > partition using hash+modulo, etc) of this approach, but > enforcing > > > > these > > > > > > > limitations can be a major problem, especially when the Samza > > jobs > > > > and > > > > > > > message brokers are managed by separate orgs in a company. > > Separate > > > > > orgs > > > > > > > are often difficult to coordinate and a system which depends on > > > such > > > > > > > significant process/coordination is too fragile for my taste. > > > > > > > > > > > > > > That said, I realize that my opinion is just one of many in the > > > > broader > > > > > > > community which may feel differently, so let me respond to some > > of > > > > the > > > > > > > other items in the discussion so we can clear them up: > > > > > > > > > > > > > > The task-to-container assignment matters because if the > > correlated > > > > > tasks > > > > > > > > (i.e. tasks that consume messages with the same key) needs to > > be > > > in > > > > > the > > > > > > > > same container so that they can share the same key/value > local > > > > store > > > > > on > > > > > > > the > > > > > > > > same physical machine. > > > > > > > > > > > > > > There is currently no supported way of sharing state among the > > > tasks > > > > > of a > > > > > > > container. Each task has its own isolated store and that > logical > > > > > > isolation > > > > > > > is the primary thing that enables Samza jobs to scale with a > > simple > > > > > > > container count change. My feeling is that we should not change > > > this > > > > > > > without good reason. > > > > > > > > > > > > > > I think we can hardcode new logic in > KafkaCheckpointLogKey.scala > > > such > > > > > > that > > > > > > > > exception will not be thrown if new grouper is > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is > > > > > GroupByPartition. > > > > > > > Does > > > > > > > > this look reasonable? > > > > > > > > > > > > > > With the current proposal, we'd also need a similar check for > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if > any > > > > other > > > > > > > groupers were later added with both these modes, we'd probably > > need > > > > to > > > > > > add > > > > > > > those too. It might be easier and cleaner to add a config to > > ignore > > > > > that > > > > > > > check temporarily. Down side is that it further complicates the > > > Samza > > > > > > > config, which is already huge. Thoughts? > > > > > > > > > > > > > > I think storing the previous task-to-partition mapping is more > > > > general > > > > > > than > > > > > > > > storing the partition count of all topics for the following > > > > reasons: > > > > > > > > - Samza already stores the task-to-container mapping and > > > > > > > container-to-host > > > > > > > > mapping in the coordinator stream. It seems consistent to > also > > > > store > > > > > > the > > > > > > > > partition-to-task mapping. And this information may be useful > > for > > > > > other > > > > > > > > use-case such as debugging. > > > > > > > > - By having the new interface take the previous > > task-to-partition > > > > > > > > assignment instead of a topic-to-partition-count mapping as > new > > > > > > > parameter, > > > > > > > > we can potentially have grouper implementation to support > other > > > > types > > > > > > of > > > > > > > > input systems. > > > > > > > > - It is sightly simpler to store the task-to-partition > > assignment > > > > > > because > > > > > > > > we don't need to know whether this is the first time a job is > > > > started > > > > > > or > > > > > > > > not. On the other hand, you can write > topic-to-partition-count > > > > > mapping > > > > > > to > > > > > > > > the coordinator stream only if this is the first time the job > > is > > > > run > > > > > > > > > > > > > > The task-to-container and container-to-host mappings are both > > > > > meaningful > > > > > > in > > > > > > > context of the JobModel. Partition-to-task mapping is not > > > meaningful > > > > > > > without some definition of the key-to-partition assignments. > It's > > > > > > > incomplete information and therefore misleading. I think it > only > > > > makes > > > > > > > sense to use this mapping if we adopt a solution wherein Samza > > also > > > > > knows > > > > > > > the partition key assignment. > > > > > > > > > > > > > > -Jake > > > > > > > > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > Hey Jacob, > > > > > > > > > > > > > > > > Thanks for taking time to review the SEP. > > > > > > > > > > > > > > > > I agree with you and Navina that the current SEP doesn't > > provide > > > > > > support > > > > > > > to > > > > > > > > arbitrary input systems and it doesn't support partition > > shrink. > > > I > > > > > > think > > > > > > > > the scope of this SEP is to support partition expansion for > > Kafka > > > > > (the > > > > > > > most > > > > > > > > widely used input system of Samza) and keep the door open for > > > > > partition > > > > > > > > support of various input systems. The current design can > > support > > > > any > > > > > > > system > > > > > > > > that meets the two operational requirement specified in the > > doc. > > > > > > > > > > > > > > > > While it is possible to support more types of input systems, > it > > > > will > > > > > > > likely > > > > > > > > add more complexity to the design. For example, the first > > > > alternative > > > > > > > > solution from you requires broker-side support to negotiate > > hash > > > > > > > algorithm. > > > > > > > > The second alternative solution requires changelog partition > > > > > reshuffle > > > > > > > > which carries its own design complexity and performance > > overhead. > > > > > There > > > > > > > is > > > > > > > > tradeoff between the generality and the complexity among > these > > > > > > choices. I > > > > > > > > like the current design because it is simple and addresses a > > big > > > > > usage > > > > > > > > scenario for us. We can add more complexity to generalize the > > > > design > > > > > if > > > > > > > it > > > > > > > > enables important use-case. Does this sound reasonable? > > > > > > > > > > > > > > > > Note that the "Rejected Alternative" section also mentions > the > > > > > > > possibility > > > > > > > > of supporting a wider range of input systems by allowing user > > to > > > > > > specify > > > > > > > > the new-partition to old-partition mapping. We are not doing > it > > > > > because > > > > > > > 1) > > > > > > > > we may have better understanding of the design after we have > a > > > > > specific > > > > > > > > second input system to support 2) the current design can be > > > > extended > > > > > to > > > > > > > > support general input systems. I think similar argument can > be > > > > > applied > > > > > > > > explain why we don't have to support general input systems > > using > > > > the > > > > > > > > potentially-good alternatives you mentioned. > > > > > > > > > > > > > > > > I hope SEP-5 can be an important first-step towards > supporting > > > > > > partition > > > > > > > > expansion of any input system. > > > > > > > > > > > > > > > > To answer your questions about the current proposal: > > > > > > > > > > > > > > > > >1. "An alternative solution is to allow task number to > > increase > > > > > after > > > > > > > > >partition expansion and uses a proper task-to-container > > > assignment > > > > > to > > > > > > > make > > > > > > > > >sure the Samza output is correct." What does the container > > have > > > to > > > > > do > > > > > > > with > > > > > > > > >stateful processing or output in general? > > > > > > > > > > > > > > > > The task-to-container assignment matters because if the > > > correlated > > > > > > tasks > > > > > > > > (i.e. tasks that consume messages with the same key) needs to > > be > > > in > > > > > the > > > > > > > > same container so that they can share the same key/value > local > > > > store > > > > > on > > > > > > > the > > > > > > > > same physical machine. > > > > > > > > > > > > > > > > >2. When you use "Join" as an example, you basically mean > > > multiple > > > > > > > > >co-partitioned streams, right? This is opposed to multiple, > > > > > > > > >independently-partitioned streams or a single stream. Would > be > > > > nice > > > > > to > > > > > > > > >formulate the proposal in these more general terms. > > > > > > > > > > > > > > > > I thought "join" is a commonly used to refer to the join > > > opeartion > > > > > with > > > > > > > > co-partitioned stream but I may be wrong. I have updated the > > wiki > > > > to > > > > > > > > explicitly mention "co-partitioned stream". Does this look > > better > > > > > now? > > > > > > > > > > > > > > > > >3. When switching SSP groupers, how will the users avoid the > > > > > > > > >org.apache.samza.checkpoint.kafka. > > > DifferingSystemStreamPartition > > > > > > > > GrouperFactoryValues > > > > > > > > >exception? > > > > > > > > > > > > > > > > I think we can hardcode new logic in > > KafkaCheckpointLogKey.scala > > > > such > > > > > > > that > > > > > > > > exception will not be thrown if new grouper is > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is > > > > > GroupByPartition. > > > > > > > Does > > > > > > > > this look reasonable? > > > > > > > > > > > > > > > > >4. Partition to task assignment is meaningless without key > to > > > > > > partition > > > > > > > > >mapping. The real semantics are captured in the external > > > > requirement > > > > > > for > > > > > > > > >partitioning via hash+modulo. But in that case, iiuc, only > the > > > > > > partition > > > > > > > > >count matters. So why not just store the original partition > > > count > > > > > > rather > > > > > > > > >than the whole mapping? > > > > > > > > > > > > > > > > I think storing the previous task-to-partition mapping is > more > > > > > general > > > > > > > than > > > > > > > > storing the partition count of all topics for the following > > > > reasons: > > > > > > > > > > > > > > > > - Samza already stores the task-to-container mapping and > > > > > > > container-to-host > > > > > > > > mapping in the coordinator stream. It seems consistent to > also > > > > store > > > > > > the > > > > > > > > partition-to-task mapping. And this information may be useful > > for > > > > > other > > > > > > > > use-case such as debugging. > > > > > > > > > > > > > > > > - By having the new interface take the previous > > task-to-partition > > > > > > > > assignment instead of a topic-to-partition-count mapping as > new > > > > > > > parameter, > > > > > > > > we can potentially have grouper implementation to support > other > > > > types > > > > > > of > > > > > > > > input systems. > > > > > > > > > > > > > > > > - It is sightly simpler to store the task-to-partition > > assignment > > > > > > because > > > > > > > > we don't need to know whether this is the first time a job is > > > > started > > > > > > or > > > > > > > > not. On the other hand, you can write > topic-to-partition-count > > > > > mapping > > > > > > to > > > > > > > > the coordinator stream only if this is the first time the job > > is > > > > run > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Dong > > > > > > > > > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes < > > > jacob.m...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Dong, > > > > > > > > > > > > > > > > > > Thanks for the SEP. Supporting partition changes is > > critically > > > > > > > important > > > > > > > > > for stateful Samza jobs, so it's great to see some ideas on > > > that > > > > > > front! > > > > > > > > > > > > > > > > > > Sorry for the late feedback, but I have a few thoughts to > > > > > contribute. > > > > > > > > > > > > > > > > > > Big +1 on Navina's comment: > > > > > > > > > > > > > > > > > > > My biggest gripe with this SEP is that it seems like a > > > > > tailor-made > > > > > > > > > > solution > > > > > > > > > > that relies on the semantics of the Kafka system and yet, > > we > > > > are > > > > > > > trying > > > > > > > > > to > > > > > > > > > > masquerade that as operational requirements for other > > systems > > > > > > > > interacting > > > > > > > > > > with Samza. (Not to say that this is the first time such > a > > > > choice > > > > > > is > > > > > > > > > being > > > > > > > > > > made in the Samza design). I am not seeing how this can a > > > > > "general" > > > > > > > > > > solution for all input systems. That's my two cents. I > > would > > > > like > > > > > > to > > > > > > > > hear > > > > > > > > > > alternative points of view for this from other devs. > > > > > > > > > > > > > > > > > > > > > > > > > > > Two examples of this: > > > > > > > > > 1. This is mostly a hypothetical, but some message brokers > > may > > > > use > > > > > > key > > > > > > > > > range assignment rather than hash+modulo. > > > > > > > > > 2. Kafka can't reduce the number of partitions, but it can > > > happen > > > > > on > > > > > > > > other > > > > > > > > > systems. For example, it may be cheaper to reduce the > number > > of > > > > > > > > partitions > > > > > > > > > on a hosted service where the cost model depends on the > > number > > > of > > > > > > > > > partitions/shards. > > > > > > > > > > > > > > > > > > It seems to me that a solution which doesn't depend on > > > partition > > > > > key > > > > > > > > > assignment in the message broker. Here are a few > alternatives > > > > that > > > > > > > > weren't > > > > > > > > > discussed and I think should be considered: > > > > > > > > > > > > > > > > > > Alternatives in order of increasing preference: > > > > > > > > > 1. Samza manages the partition hash (via some new contract > > with > > > > the > > > > > > > > > brokers) and guarantees correct routing of keys among the > new > > > > > > > partitions. > > > > > > > > > 2. Samza detects a task count change, creates a new > changelog > > > > with > > > > > > > > correct > > > > > > > > > partitions, and *somehow* reshuffles all existing changelog > > > data > > > > > into > > > > > > > the > > > > > > > > > new topic and then uses the new topic from then on. > (doesn't > > > work > > > > > > > without > > > > > > > > > changelog, but in that case durability isn't paramount, so > we > > > can > > > > > > just > > > > > > > > > wipe) > > > > > > > > > 3. Use RPC in between stages and samza fully manages key > > > > assignment > > > > > > > among > > > > > > > > > tasks. No on-disk topic data to clean up. Mandatory > > > > repartitioning > > > > > in > > > > > > > the > > > > > > > > > first stage to pre-scaled tasks in next stage. > > > > > > > > > 4. Combined 2-3 solution > > > > > > > > > > > > > > > > > > Finally, some questions about the current proposal: > > > > > > > > > 1. "An alternative solution is to allow task number to > > increase > > > > > after > > > > > > > > > partition expansion and uses a proper task-to-container > > > > assignment > > > > > to > > > > > > > > make > > > > > > > > > sure the Samza output is correct." What does the container > > have > > > > to > > > > > do > > > > > > > > with > > > > > > > > > stateful processing or output in general? > > > > > > > > > 2. When you use "Join" as an example, you basically mean > > > multiple > > > > > > > > > co-partitioned streams, right? This is opposed to multiple, > > > > > > > > > independently-partitioned streams or a single stream. Would > > be > > > > nice > > > > > > to > > > > > > > > > formulate the proposal in these more general terms. > > > > > > > > > 3. When switching SSP groupers, how will the users avoid > the > > > > > > > > > org.apache.samza.checkpoint.kafka. > DifferingSystemStreamParti > > > > > > > > > tionGrouperFactoryValues > > > > > > > > > exception? > > > > > > > > > 4. Partition to task assignment is meaningless without key > to > > > > > > partition > > > > > > > > > mapping. The real semantics are captured in the external > > > > > requirement > > > > > > > for > > > > > > > > > partitioning via hash+modulo. But in that case, iiuc, only > > the > > > > > > > partition > > > > > > > > > count matters. So why not just store the original partition > > > count > > > > > > > rather > > > > > > > > > than the whole mapping? > > > > > > > > > > > > > > > > > > -Jake > > > > > > > > > > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin < > > lindon...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey Yi, Navina, > > > > > > > > > > > > > > > > > > > > I have updated the SEP-5 document based on our > discussion. > > > The > > > > > > > > difference > > > > > > > > > > can be found here > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/ > > > > > > diffpagesbyversion.action > > > > > > > ? > > > > > > > > > > pageId=70255476&selectedPageVersions=14& > > selectedPageVersions > > > > > =15>. > > > > > > > > > > Here is the summary of changes: > > > > > > > > > > > > > > > > > > > > - Add new interface that extends the existing interface > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added grouper class > > > should > > > > > > > > implement > > > > > > > > > > this interface. > > > > > > > > > > - Explained in the Rejected Alternative Section why we > > don't > > > > add > > > > > > new > > > > > > > > > method > > > > > > > > > > in the existing interface > > > > > > > > > > - Explained in the Rejected Alternative Section why we > > don't > > > > > > > > config/class > > > > > > > > > > for user to specify new-partition to old-partition > mapping. > > > > > > > > > > > > > > > > > > > > Can you take another look at the proposal and let me know > > if > > > > > there > > > > > > is > > > > > > > > any > > > > > > > > > > concern? > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin < > > > lindon...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hey Yi, > > > > > > > > > > > > > > > > > > > > > > Thanks much for the comment. I have updated the doc to > > > > address > > > > > > all > > > > > > > > your > > > > > > > > > > > comments except the one related to the interface. I am > > not > > > > > sure I > > > > > > > > > > > understand your suggestion of the new interface. Will > > > discuss > > > > > > > > tomorrow. > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan < > > > nickpa...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > >> Hi, Don, > > > > > > > > > > >> > > > > > > > > > > >> Thanks for the detailed design doc for a long-waited > > > feature > > > > > in > > > > > > > > Samza! > > > > > > > > > > >> Really appreciate it! I did a quick pass and have the > > > > > following > > > > > > > > > > comments: > > > > > > > > > > >> > > > > > > > > > > >> - minor: "limit the maximum size of partition" ==> > > "limit > > > > the > > > > > > > > maximum > > > > > > > > > > size > > > > > > > > > > >> of each partition" > > > > > > > > > > >> - "However, Samza currently is not able to handle > > > partition > > > > > > > > expansion > > > > > > > > > of > > > > > > > > > > >> the input streams"==>better point out "for stateful > > jobs". > > > > For > > > > > > > > > stateless > > > > > > > > > > >> jobs, simply bouncing the job now can pick up the new > > > > > > partitions. > > > > > > > > > > >> - "it is possible (e.g. with Kafka) that messages > with a > > > > given > > > > > > key > > > > > > > > > > exists > > > > > > > > > > >> in both partition 1 an 3. Because GroupByPartition > will > > > > assign > > > > > > > > > > partition 1 > > > > > > > > > > >> and 3 to different tasks, messages with the same key > may > > > be > > > > > > > handled > > > > > > > > by > > > > > > > > > > >> different task/container/process and their state will > be > > > > > stored > > > > > > in > > > > > > > > > > >> different changelog partition." The problem statement > is > > > not > > > > > > super > > > > > > > > > clear > > > > > > > > > > >> here. The issues with stateful jobs is: after > > > > GroupByPartition > > > > > > > > assign > > > > > > > > > > >> partition 1 and 3 to different tasks, the new task > > > handling > > > > > > > > partition > > > > > > > > > 3 > > > > > > > > > > >> does not have the previous state to resume the work. > > e.g. > > > a > > > > > > > page-key > > > > > > > > > > based > > > > > > > > > > >> counter would start from 0 in the new task for a > > specific > > > > key, > > > > > > > > instead > > > > > > > > > > of > > > > > > > > > > >> resuming the previous count 50 held by task 1. > > > > > > > > > > >> - minor rewording: "the first solution in this doc" > ==> > > > "the > > > > > > > > solution > > > > > > > > > > >> proposed in this doc" > > > > > > > > > > >> - "Thus additional development work is needed in Kafka > > to > > > > meet > > > > > > > this > > > > > > > > > > >> requirement" It would be good to link to a KIP if and > > when > > > > it > > > > > > > exists > > > > > > > > > > >> - Instead of touching/deprecating the interface > > > > > > > > > > >> SystemStreamPartitionGrouper, I would recommend to > have > > a > > > > > > > different > > > > > > > > > > >> implementation class of the interface, which in the > > > > > constructor > > > > > > of > > > > > > > > the > > > > > > > > > > >> grouper, takes two parameters: a) the previous task > > number > > > > > read > > > > > > > from > > > > > > > > > the > > > > > > > > > > >> coordinator stream; b) the configured new-partition to > > > > > > > old-partition > > > > > > > > > > >> mapping policy. Then, the grouper's interface method > > stays > > > > the > > > > > > > same > > > > > > > > > and > > > > > > > > > > >> the > > > > > > > > > > >> behavior of the grouper is more configurable which is > > good > > > > to > > > > > > > > support > > > > > > > > > a > > > > > > > > > > >> broader set of use cases in addition to Kafka's > built-in > > > > > > partition > > > > > > > > > > >> expansion policies. > > > > > > > > > > >> - Minor renaming suggestion to the new grouper class > > > names: > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum > > > > > > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum > > > > > > > > > > >> > > > > > > > > > > >> Thanks! > > > > > > > > > > >> > > > > > > > > > > >> - Yi > > > > > > > > > > >> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin < > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > >> > > > > > > > > > > >> > Hey Navina, > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks much for the comment. Please see my response > > > below. > > > > > > > > > > >> > > > > > > > > > > > >> > Regarding your biggest gripe with the SEP, I > > personally > > > > > think > > > > > > > the > > > > > > > > > > >> > operational requirement proposed in the KIP are > pretty > > > > > general > > > > > > > and > > > > > > > > > > >> could be > > > > > > > > > > >> > easily enforced by other systems. The reason is that > > the > > > > > > module > > > > > > > > > > >> operation > > > > > > > > > > >> > is pretty standard and the default option when we > > choose > > > > > > > > partition. > > > > > > > > > > And > > > > > > > > > > >> > usually the underlying system allows user to select > > > > > arbitrary > > > > > > > > > > partition > > > > > > > > > > >> > number if it supports partition expansion. Do you > know > > > any > > > > > > > system > > > > > > > > > that > > > > > > > > > > >> does > > > > > > > > > > >> > not meet these two requirement? > > > > > > > > > > >> > > > > > > > > > > > >> > Regarding your comment of the Motivation section, I > > > > renamed > > > > > > the > > > > > > > > > first > > > > > > > > > > >> > section as "Problem and Goal" and specified that > "*The > > > > goal > > > > > of > > > > > > > > this > > > > > > > > > > >> > proposal is to enable partition expansion of the > input > > > > > > > > streams*.". I > > > > > > > > > > >> also > > > > > > > > > > >> > put a sentence at the end of the Motivation section > > that > > > > > "*The > > > > > > > > > feature > > > > > > > > > > >> of > > > > > > > > > > >> > task expansion is out of the scope of this proposal > > and > > > > will > > > > > > be > > > > > > > > > > >> addressed > > > > > > > > > > >> > in a future SEP*". The second paragraph in the > > > Motivation > > > > > > > section > > > > > > > > is > > > > > > > > > > >> mainly > > > > > > > > > > >> > used to explain the thinking process that we have > gone > > > > > > through, > > > > > > > > what > > > > > > > > > > >> other > > > > > > > > > > >> > alternative we have considered, and we plan to do in > > > Samza > > > > > in > > > > > > > the > > > > > > > > > nex > > > > > > > > > > >> step. > > > > > > > > > > >> > > > > > > > > > > > >> > To answer your question why increasing the partition > > > > number > > > > > > will > > > > > > > > > > >> increase > > > > > > > > > > >> > the throughput of the kafka consumer in the > container, > > > > Kafka > > > > > > > > > consumer > > > > > > > > > > >> can > > > > > > > > > > >> > potentially fetch more data in one FetchResponse > with > > > more > > > > > > > > > partitions > > > > > > > > > > in > > > > > > > > > > >> > the FetchRequest. This is because we limit the > maximum > > > > > amount > > > > > > of > > > > > > > > > data > > > > > > > > > > >> that > > > > > > > > > > >> > can be fetch for a given partition in the > > FetchResponse. > > > > > This > > > > > > by > > > > > > > > > > >> default is > > > > > > > > > > >> > set to 1 MB. And there is reason that we can not > > > > arbitrarily > > > > > > > bump > > > > > > > > up > > > > > > > > > > >> this > > > > > > > > > > >> > limit. > > > > > > > > > > >> > > > > > > > > > > > >> > To answer your question how partition expansion in > > Kafka > > > > > > impacts > > > > > > > > the > > > > > > > > > > >> > clients, Kafka consumer is able to automatically > > detect > > > > new > > > > > > > > > partition > > > > > > > > > > of > > > > > > > > > > >> > the topic and reassign all (both old and new) > > partitions > > > > > > across > > > > > > > > > > >> consumers > > > > > > > > > > >> > in the consumer group IF you tell consumer the topic > > to > > > be > > > > > > > > > subscribed. > > > > > > > > > > >> But > > > > > > > > > > >> > consumer in Samza's container uses another way of > > > > > > subscription. > > > > > > > > > > Instead > > > > > > > > > > >> of > > > > > > > > > > >> > subscribing to the topic, the consumer in Samza's > > > > container > > > > > > > > > subscribes > > > > > > > > > > >> to > > > > > > > > > > >> > the specific partitions of the topic. In this case, > if > > > new > > > > > > > > > partitions > > > > > > > > > > >> have > > > > > > > > > > >> > been added, Samza will need to explicitly subscribe > to > > > the > > > > > new > > > > > > > > > > >> partitions > > > > > > > > > > >> > of the topic. The "Handle partition expansion while > > > tasks > > > > > are > > > > > > > > > running" > > > > > > > > > > >> > section in the SEP addresses this issue in Samza -- > it > > > > > > > > recalculates > > > > > > > > > > the > > > > > > > > > > >> job > > > > > > > > > > >> > model and restart container so that consumer can > > > subscribe > > > > > to > > > > > > > the > > > > > > > > > new > > > > > > > > > > >> > partitions. > > > > > > > > > > >> > > > > > > > > > > > >> > I will ask other dev to take a look at the > proposal. I > > > > will > > > > > > > start > > > > > > > > > the > > > > > > > > > > >> > voting thread tomorrow if there is no further > concern > > > with > > > > > the > > > > > > > > SEP. > > > > > > > > > > >> > > > > > > > > > > > >> > Thanks! > > > > > > > > > > >> > Dong > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh > > > (Apache) < > > > > > > > > > > >> > nav...@apache.org> > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > >> > > Hey Dong, > > > > > > > > > > >> > > > > > > > > > > > > >> > > > I have updated the motivation section to > clarify > > > > this. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Thanks for updating the motivation. Couple of > notes > > > > here: > > > > > > > > > > >> > > > > > > > > > > > > >> > > 1. > > > > > > > > > > >> > > > "The motivation of increasing partition number > of > > > > Kafka > > > > > > > topic > > > > > > > > > > >> includes > > > > > > > > > > >> > 1) > > > > > > > > > > >> > > limit the maximum size of a partition in order to > > > > improve > > > > > > > broker > > > > > > > > > > >> > > performance and 2) increase throughput of Kafka > > > consumer > > > > > in > > > > > > > the > > > > > > > > > > Samza > > > > > > > > > > >> > > container." > > > > > > > > > > >> > > > > > > > > > > > > >> > > It's unclear to me how increasing the partition > > number > > > > > will > > > > > > > > > increase > > > > > > > > > > >> the > > > > > > > > > > >> > > throughput of the kafka consumer in the container? > > > > > > > > Theoretically, > > > > > > > > > > you > > > > > > > > > > >> > will > > > > > > > > > > >> > > still be consuming the same amount of data in the > > > > > container, > > > > > > > > > > >> irrespective > > > > > > > > > > >> > > of whether it is coming from one partition or more > > > than > > > > > one > > > > > > > > > expanded > > > > > > > > > > >> > > partitions. Can you please explain it for me here, > > > what > > > > > you > > > > > > > mean > > > > > > > > > by > > > > > > > > > > >> that? > > > > > > > > > > >> > > > > > > > > > > > > >> > > 2. I believe the second paragraph under motivation > > is > > > > > simply > > > > > > > > > talking > > > > > > > > > > >> > about > > > > > > > > > > >> > > the scope of the current SEP. It will be easier to > > > read > > > > if > > > > > > > what > > > > > > > > > > >> solution > > > > > > > > > > >> > is > > > > > > > > > > >> > > included in this SEP and what is left out as not > in > > > > scope. > > > > > > > (for > > > > > > > > > > >> example, > > > > > > > > > > >> > > expansions for stateful jobs is supported or not). > > > > > > > > > > >> > > > > > > > > > > > > >> > > > We need to persist the task-to-sspList mapping > in > > > the > > > > > > > > > > >> > > coordinator stream so that the job can derive the > > > > original > > > > > > > > number > > > > > > > > > of > > > > > > > > > > >> > > partitions of each input stream regardless of how > > many > > > > > times > > > > > > > the > > > > > > > > > > >> > partition > > > > > > > > > > >> > > has expanded. Does this make sense? > > > > > > > > > > >> > > > > > > > > > > > > >> > > Yes. It does! > > > > > > > > > > >> > > > > > > > > > > > > >> > > > I am not sure how this is related to the > locality > > > > > though. > > > > > > > Can > > > > > > > > > you > > > > > > > > > > >> > clarify > > > > > > > > > > >> > > your question if I haven't answered your question? > > > > > > > > > > >> > > > > > > > > > > > > >> > > It's not related. I just meant to give an example > of > > > yet > > > > > > > another > > > > > > > > > > >> > > coordinator message that is persisted. Your > > > ssp-to-task > > > > > > > mapping > > > > > > > > is > > > > > > > > > > >> > > following a similar pattern for persisting. Just > > > wanted > > > > to > > > > > > > > clarify > > > > > > > > > > >> that. > > > > > > > > > > >> > > > > > > > > > > > > >> > > > Can you let me know if this, together with the > > > answers > > > > > in > > > > > > > the > > > > > > > > > > >> previous > > > > > > > > > > >> > > email, addresses all your questions? > > > > > > > > > > >> > > > > > > > > > > > > >> > > Yes. I believe you have addressed most of my > > > questions. > > > > > > Thanks > > > > > > > > for > > > > > > > > > > >> taking > > > > > > > > > > >> > > time to do that. > > > > > > > > > > >> > > > > > > > > > > > > >> > > > Is there specific question you have regarding > > > > partition > > > > > > > > > > >> > > expansion in Kafka? > > > > > > > > > > >> > > > > > > > > > > > > >> > > I guess my questions are on how partition > expansion > > in > > > > > Kafka > > > > > > > > > impacts > > > > > > > > > > >> the > > > > > > > > > > >> > > clients. Iiuc, partition expansions are done > > manually > > > in > > > > > > Kafka > > > > > > > > > based > > > > > > > > > > >> on > > > > > > > > > > >> > the > > > > > > > > > > >> > > bytes-in rate of the partition. Do the existing > > kafka > > > > > > clients > > > > > > > > > handle > > > > > > > > > > >> this > > > > > > > > > > >> > > expansion automatically? if yes, how does it work? > > If > > > > not, > > > > > > are > > > > > > > > > there > > > > > > > > > > >> > plans > > > > > > > > > > >> > > to support it in the future? > > > > > > > > > > >> > > > > > > > > > > > > >> > > > Thus user's job should not need to bootstrap > > > key/value > > > > > > store > > > > > > > > > from > > > > > > > > > > >> the > > > > > > > > > > >> > > changelog topic. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Why is this discussion relevant here? Key/value > > store > > > / > > > > > > > > changelog > > > > > > > > > > >> topic > > > > > > > > > > >> > > partition is scoped with the context of a task. > > Since > > > we > > > > > are > > > > > > > not > > > > > > > > > > >> changing > > > > > > > > > > >> > > the number of tasks, I don't think it is required > to > > > > > mention > > > > > > > it > > > > > > > > > > here. > > > > > > > > > > >> > > > > > > > > > > > > >> > > > The new method takes the > > > SystemStreamPartition-to-Task > > > > > > > > > assignment > > > > > > > > > > >> from > > > > > > > > > > >> > > the previous job model which can be read from the > > > > > > coordinator > > > > > > > > > > stream. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Jobmodel is currently not persisted to coordinator > > > > stream. > > > > > > In > > > > > > > > your > > > > > > > > > > >> > design, > > > > > > > > > > >> > > you talk about writing separate coordinator > messages > > > for > > > > > > > > > ssp-to-task > > > > > > > > > > >> > > assignments. Hence, please correct this statement. > > It > > > is > > > > > > kind > > > > > > > of > > > > > > > > > > >> > misleading > > > > > > > > > > >> > > to the reader. > > > > > > > > > > >> > > > > > > > > > > > > >> > > My biggest gripe with this SEP is that it seems > > like a > > > > > > > > tailor-made > > > > > > > > > > >> > solution > > > > > > > > > > >> > > that relies on the semantics of the Kafka system > and > > > > yet, > > > > > we > > > > > > > are > > > > > > > > > > >> trying > > > > > > > > > > >> > to > > > > > > > > > > >> > > masquerade that as operational requirements for > > other > > > > > > systems > > > > > > > > > > >> interacting > > > > > > > > > > >> > > with Samza. (Not to say that this is the first > time > > > > such a > > > > > > > > choice > > > > > > > > > is > > > > > > > > > > >> > being > > > > > > > > > > >> > > made in the Samza design). I am not seeing how > this > > > can > > > > a > > > > > > > > > "general" > > > > > > > > > > >> > > solution for all input systems. That's my two > > cents. I > > > > > would > > > > > > > > like > > > > > > > > > to > > > > > > > > > > >> hear > > > > > > > > > > >> > > alternative points of view for this from other > devs. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Please make sure you have enough eyes on this SEP. > > If > > > > you > > > > > > do, > > > > > > > > > please > > > > > > > > > > >> > start > > > > > > > > > > >> > > a VOTE thread to approve this SEP. > > > > > > > > > > >> > > > > > > > > > > > > >> > > Thanks! > > > > > > > > > > >> > > Navina > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin < > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > > >> > > > > > > > > > > > > >> > > > Hey Navina, > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > I have updated the wiki based on your > suggestion. > > > More > > > > > > > > > > >> specifically, I > > > > > > > > > > >> > > have > > > > > > > > > > >> > > > made the following changes: > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > - Improved Problem section and Motivation > section > > to > > > > > > > describe > > > > > > > > > why > > > > > > > > > > we > > > > > > > > > > >> > use > > > > > > > > > > >> > > > the solution in this proposal instead of > tackling > > > the > > > > > > > problem > > > > > > > > of > > > > > > > > > > >> task > > > > > > > > > > >> > > > expansion directly. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > - Illustrate the design in a way that doesn't > bind > > > to > > > > > > Kafka. > > > > > > > > > Kafka > > > > > > > > > > >> is > > > > > > > > > > >> > > only > > > > > > > > > > >> > > > used as example to illustrate why we want to > > expand > > > > > > > partition > > > > > > > > > > >> expansion > > > > > > > > > > >> > > and > > > > > > > > > > >> > > > whether the operational requirement can be > > supported > > > > > when > > > > > > > > Kafka > > > > > > > > > is > > > > > > > > > > >> used > > > > > > > > > > >> > > as > > > > > > > > > > >> > > > the input system. Note that the proposed > solution > > > > should > > > > > > > work > > > > > > > > > for > > > > > > > > > > >> any > > > > > > > > > > >> > > input > > > > > > > > > > >> > > > system that meets the operational requirement > > > > described > > > > > in > > > > > > > the > > > > > > > > > > wiki. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > - Fixed the problem in the figure. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > - Added a new class > GroupBySystemStreamPartitionFi > > > > > > > xedTaskNum > > > > > > > > to > > > > > > > > > > the > > > > > > > > > > >> > > wiki. > > > > > > > > > > >> > > > Together with GroupByPartitionFixedTaskNum, it > > > should > > > > > > ensure > > > > > > > > > that > > > > > > > > > > we > > > > > > > > > > >> > > have a > > > > > > > > > > >> > > > solution to enable partition expansion for all > > users > > > > > that > > > > > > > are > > > > > > > > > > using > > > > > > > > > > >> > > > pre-defined grouper in Samza. Note that those > > users > > > > who > > > > > > use > > > > > > > > > custom > > > > > > > > > > >> > > grouper > > > > > > > > > > >> > > > would need to update their implementation. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Can you let me know if this, together with the > > > answers > > > > > in > > > > > > > the > > > > > > > > > > >> previous > > > > > > > > > > >> > > > email, addresses all your questions? Thanks for > > > taking > > > > > > time > > > > > > > to > > > > > > > > > > >> review > > > > > > > > > > >> > the > > > > > > > > > > >> > > > proposal. > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > Regards, > > > > > > > > > > >> > > > Dong > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin < > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Hey Navina, > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Thanks much for your comments. Please see my > > reply > > > > > > inline. > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina > Ramesh > > > > > > (Apache) < > > > > > > > > > > >> > > > > nav...@apache.org> wrote: > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of > > > > > questions > > > > > > to > > > > > > > > > > >> understand > > > > > > > > > > >> > > > your > > > > > > > > > > >> > > > >> proposal better: > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> * Under motivation, you mention that "_We > > expect > > > > this > > > > > > > > > solution > > > > > > > > > > to > > > > > > > > > > >> > work > > > > > > > > > > >> > > > >> similarly with other input system as well._", > > > yet I > > > > > > don't > > > > > > > > see > > > > > > > > > > any > > > > > > > > > > >> > > > >> discussion on how it will work with other > input > > > > > > systems. > > > > > > > > That > > > > > > > > > > is, > > > > > > > > > > >> > what > > > > > > > > > > >> > > > >> kind > > > > > > > > > > >> > > > >> of contract does samza expect from other > input > > > > > systems > > > > > > ? > > > > > > > If > > > > > > > > > we > > > > > > > > > > >> are > > > > > > > > > > >> > not > > > > > > > > > > >> > > > >> planning to provide a generic solution, it > > might > > > be > > > > > > worth > > > > > > > > > > >> calling it > > > > > > > > > > >> > > out > > > > > > > > > > >> > > > >> in > > > > > > > > > > >> > > > >> the SEP. > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > I think the contract we expect from other > > systems > > > > are > > > > > > > > exactly > > > > > > > > > > the > > > > > > > > > > >> > > > > operational requirement mentioned in the SEP, > > i.e. > > > > > > > > partitions > > > > > > > > > > >> should > > > > > > > > > > >> > > > always > > > > > > > > > > >> > > > > be doubled and the hash algorithm should > module > > > the > > > > > > number > > > > > > > > of > > > > > > > > > > >> > > partitions. > > > > > > > > > > >> > > > > SEP-5 should also allow partition expansion of > > all > > > > > input > > > > > > > > > systems > > > > > > > > > > >> that > > > > > > > > > > >> > > > meet > > > > > > > > > > >> > > > > these two requirements. I have updated the > > > > motivation > > > > > > > > section > > > > > > > > > to > > > > > > > > > > >> > > clarify > > > > > > > > > > >> > > > > this. > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> * I understand the partition mapping logic > you > > > have > > > > > > > > proposed. > > > > > > > > > > >> But I > > > > > > > > > > >> > > > think > > > > > > > > > > >> > > > >> the example explanation doesn't match the > > > diagram. > > > > In > > > > > > the > > > > > > > > > > >> diagram, > > > > > > > > > > >> > > after > > > > > > > > > > >> > > > >> expansion, partiion-0 and partition-1 are > > > pointing > > > > to > > > > > > > > bucket > > > > > > > > > 0 > > > > > > > > > > >> and > > > > > > > > > > >> > > > >> partition-3 and partition-4 are pointing to > > > bucket > > > > > 1. I > > > > > > > > think > > > > > > > > > > the > > > > > > > > > > >> > > former > > > > > > > > > > >> > > > >> has to be partition-0 and partition-2 and the > > > > latter, > > > > > > is > > > > > > > > > > >> partition-1 > > > > > > > > > > >> > > and > > > > > > > > > > >> > > > >> partition-3. If I am wrong, please help me > > > > understand > > > > > > the > > > > > > > > > logic > > > > > > > > > > >> :) > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Good catch. I will update the figure to fix > this > > > > > > problem. > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> * I don't know how partition expansion in > Kafka > > > > > works. > > > > > > I > > > > > > > am > > > > > > > > > > >> familiar > > > > > > > > > > >> > > > with > > > > > > > > > > >> > > > >> how shard splitting happens in Kinesis - > there > > is > > > > > > > > > hierarchical > > > > > > > > > > >> > > relation > > > > > > > > > > >> > > > >> between the parent and child shards. This > way, > > it > > > > > will > > > > > > > also > > > > > > > > > > allow > > > > > > > > > > >> > the > > > > > > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only > > > supports > > > > > > > > partition > > > > > > > > > > >> > > > "expansion", > > > > > > > > > > >> > > > >> as opposed to "splits". Can you provide some > > > > context > > > > > or > > > > > > > > link > > > > > > > > > > >> related > > > > > > > > > > >> > > to > > > > > > > > > > >> > > > >> how > > > > > > > > > > >> > > > >> partition expansion works in Kafka? > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > I couldn't find any wiki on partition > expansion > > in > > > > > > Kafka. > > > > > > > > The > > > > > > > > > > >> > partition > > > > > > > > > > >> > > > > expansion logic in Kafka is very simply -- it > > > simply > > > > > > adds > > > > > > > > new > > > > > > > > > > >> > partition > > > > > > > > > > >> > > > to > > > > > > > > > > >> > > > > the existing topic. Is there specific question > > you > > > > > have > > > > > > > > > > regarding > > > > > > > > > > >> > > > partition > > > > > > > > > > >> > > > > expansion in Kafka? > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> * Are you only recommending that expansion > can > > be > > > > > > > supported > > > > > > > > > for > > > > > > > > > > >> > samza > > > > > > > > > > >> > > > jobs > > > > > > > > > > >> > > > >> that use Kafka as input systems **and** > > configure > > > > the > > > > > > > > > > SSPGrouper > > > > > > > > > > >> as > > > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me > like > > > > this > > > > > > only > > > > > > > > > > applies > > > > > > > > > > >> > for > > > > > > > > > > >> > > > >> GroupByPartition. Please correct me if I am > > > wrong. > > > > > What > > > > > > > is > > > > > > > > > the > > > > > > > > > > >> > > > expectation > > > > > > > > > > >> > > > >> for custom SSP Groupers? > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > The expansion can be supported for Samza jobs > if > > > the > > > > > > input > > > > > > > > > > system > > > > > > > > > > >> > meets > > > > > > > > > > >> > > > > the operational requirement mentioned above. > It > > > > > doesn't > > > > > > > have > > > > > > > > > to > > > > > > > > > > >> use > > > > > > > > > > >> > > Kafka > > > > > > > > > > >> > > > > as input system. > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > The current proposal provided solution for > jobs > > > that > > > > > > > > currently > > > > > > > > > > use > > > > > > > > > > >> > > > > GroupByPartition. The proposal can be extended > > to > > > > > > support > > > > > > > > jobs > > > > > > > > > > >> that > > > > > > > > > > >> > use > > > > > > > > > > >> > > > > other grouper that are pre-defined in Samza. > The > > > > > custom > > > > > > > SSP > > > > > > > > > > >> grouper > > > > > > > > > > >> > > needs > > > > > > > > > > >> > > > > to handle partition expansion similar to how > > > > > > > > > > >> > > GroupByPartitionFixedTaskNum > > > > > > > > > > >> > > > > handles it and it is users' responsibility to > > > update > > > > > > their > > > > > > > > > > custom > > > > > > > > > > >> > > grouper > > > > > > > > > > >> > > > > implementation. > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to > > > > > > coordinator > > > > > > > > > > stream: > > > > > > > > > > >> > > Today, > > > > > > > > > > >> > > > >> the JobModel encapsulates the data model in > > samza > > > > > which > > > > > > > > also > > > > > > > > > > >> > includes > > > > > > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows > the > > > > > > > > > task-to-sspList > > > > > > > > > > >> > > mapping. > > > > > > > > > > >> > > > >> What is the reason for using a separate > > > coordinator > > > > > > > stream > > > > > > > > > > >> message > > > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the > JobModel > > > > > itself > > > > > > is > > > > > > > > not > > > > > > > > > > >> > > persisted > > > > > > > > > > >> > > > in > > > > > > > > > > >> > > > >> the coordinator stream today? The reason > > > locality > > > > > > exists > > > > > > > > > > >> outside of > > > > > > > > > > >> > > the > > > > > > > > > > >> > > > >> jobmodel is because *locality* information is > > > > written > > > > > > by > > > > > > > > each > > > > > > > > > > >> > > container, > > > > > > > > > > >> > > > >> where as it is consumed only by the leader > > > > > > > > jobcoordinator/AM. > > > > > > > > > > In > > > > > > > > > > >> > this > > > > > > > > > > >> > > > >> case, > > > > > > > > > > >> > > > >> the writer of the mapping information and the > > > > reader > > > > > is > > > > > > > > still > > > > > > > > > > the > > > > > > > > > > >> > > leader > > > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to understand > the > > > > > > > motivation > > > > > > > > > for > > > > > > > > > > >> this > > > > > > > > > > >> > > > >> choice. > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Yes, the reason for using a separate > coordinate > > > > stream > > > > > > > > message > > > > > > > > > > is > > > > > > > > > > >> > > because > > > > > > > > > > >> > > > > the task-to-sspList mapping is not currently > > > > persisted > > > > > > in > > > > > > > > the > > > > > > > > > > >> > > coordinator > > > > > > > > > > >> > > > > stream. We wouldn't need to create this new > > stream > > > > > > message > > > > > > > > if > > > > > > > > > > >> > JobModel > > > > > > > > > > >> > > is > > > > > > > > > > >> > > > > persisted. We need to persist the > > task-to-sspList > > > > > > mapping > > > > > > > in > > > > > > > > > the > > > > > > > > > > >> > > > > coordinator stream so that the job can derive > > the > > > > > > original > > > > > > > > > > number > > > > > > > > > > >> of > > > > > > > > > > >> > > > > partitions of each input stream regardless of > > how > > > > many > > > > > > > times > > > > > > > > > the > > > > > > > > > > >> > > > partition > > > > > > > > > > >> > > > > has expanded. Does this make sense? > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > I am not sure how this is related to the > > locality > > > > > > though. > > > > > > > > Can > > > > > > > > > > you > > > > > > > > > > >> > > clarify > > > > > > > > > > >> > > > > your question if I haven't answered your > > question? > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > Thanks! > > > > > > > > > > >> > > > > Dong > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> Cheers! > > > > > > > > > > >> > > > >> Navina > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin < > > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > Hi all, > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > We created SEP-5: Enable partition > expansion > > of > > > > > input > > > > > > > > > > streams. > > > > > > > > > > >> > > Please > > > > > > > > > > >> > > > >> find > > > > > > > > > > >> > > > >> > the SEP wiki in the link > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/ > > > > > > confluence/display/SAMZA/SEP- > > > > > > > > > > >> > > > >> > 5%3A+Enable+partition+ > > > expansion+of+input+streams > > > > > > > > > > >> > > > >> > . > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > You feedback is appreciated! > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > Thanks, > > > > > > > > > > >> > > > >> > Dong > > > > > > > > > > >> > > > >> > > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- We are hiring in Streams Infra (Kafka/Samza/Datastream) !!