Thanks, Dong. The summary looks accurate.
I'll let the others chime in, as I believe my perspective has been adequately captured in this thread. -Jake On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jacob, > > Thank you for taking so much time to discuss with me! I appreciate the > discussion and the insight. I will summarize our discussion below. > > 1) Whether it is reasonable to store partition-to-task mapping. > > We agree that this partition-to-task mapping will be reasonable if we allow > user to specify either the new-partition-to-old-partition mapping or > key-to-partition mapping in the future. SEP-5 doesn't currently provide a > way for user to specify new-partition-to-old partition mapping because we > don't have a good idea about that interface until we try to enable > partition expansion for input system other than Kafka in the future. This > is currently specified as the third future work in SEP-5. > > And if we decide to implement SEP-5, I will include a warning message > regarding the use of partition-to-task, i.e. "this does not specify the > key-to-task mapping". We agree that this could address the concern here. > > 2) Whether we should follow the approach in SEP-5 or use an extra > re-partitioning stage in the stateful Samza job to enable partition > expansion. > > Here are the pros and cons of the extra re-partitioning stage in comparison > to SEP-5. > > Pros: > - It doesn't require owner of the Samza job to know the partitioning > algorithm of used for the input stream. If the owner of the Samza job is in > a different organization than the producer of the input stream, this > solution frees different organizations from having to coordinate with each > other. > - It doesn't require owner of the Samza job to specify the partitioning > algorithm of used for the input stream. Thus less config. > > Cons: > - User has to make code change on their side to use the new fluent API. > - The extra partitioning stage would potentially increases latency. > - The extra partitioning stage would incur additional cost due to the extra > internal topic. The cost is probably not that much with the new trim() API > in Kafka if Samza uses Kafka to store the internal topic. But the cost may > be doubled if Samza uses another input system that doesn't provide trim() > API to delete data on demand. > > My recommendation is to adopt a hybrid solution, i.e. we still implement > the current proposal in SEP-5 so that we enable partition expansion without > incurring extra latency/cost and without requiring users to change their > code. And we can recommend user to use the extra partitioning stage if the > coordination among different organization is indeed a concern. > > Can other developers also provide feedback regarding your preference > between the two? > > Thanks, > Dong > > > > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <jacob.m...@gmail.com> wrote: > > > Hey Dong, > > > > I appreciate your thoughtful responses. Let's do one more round :-) > > > > > > > Here are my current concern with the three alternatives you described > > > earlier: > > > - The first alternative requires support from input system which is > > > currently not available. It will limit the usage of partition expansion > > to > > > only systems that support such interface. And it is not guaranteed that > > we > > > can persuade the developer of the input system to add this interface. > > This > > > is not desirable for Samza in the long term. > > > > Agreed. It is very wishful thinking that each supported system would > > provide such a contract. > > > > > > > > > > - I can not comment on the second alternative because I don't > understand > > > how it reshuffles all existing changelog data. We can discuss more if > > there > > > is more specific detail. My gut feel is that this will be complex and > > > carries performance overhead. > > > > After giving this more thought, I agree. There is no clear way to > migrate a > > changelog without knowing the original key->partition mapping. Which > leads > > us to alternative 3... > > > > > > > > > > - The third alternative requires performance overhead. Given that user > > can > > > already use this solution to enable partition expansion, maybe Samza > > > developers can provide more input as to why we are not doing it by > > default. > > > My gut feel is that it carries considerable performance overhead and > > > increases the cost-to-serve Samze job (e.g. disk usage), which may make > > it > > > undesirable in the long term. > > > > I think the only performance overhead would be the mandatory > repartitioning > > stage for stateful jobs. But a repartitioner is usually much faster than > > the downstream stateful job, so it only seems a cost to serve issue. > > > > As for why we aren't already doing this, I would posit that before the > > introduction of the high level API, which trivializes repartitioning, it > > was unreasonable to expect each job owner to do the mandatory > > repartitioning. With the high level API, I would argue this is much more > > doable. > > > > > > I am not sure it is true that "any future feature that utilizes this > > > mapping without accounting for the assumptions of this SEP is likely to > > > malfunction". Suppose we allow user to specify new-to-old-partition > > > mapping, then we can use the partition-to-task mapping correctly > without > > > replying on the assumption in this SEP, right? > > > > Right, but my point was that the partition->task mapping is not > sufficient > > by itself. So adding it by itself is potentially misleading. > > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Thanks for the reply Jacob. Please see my comment inline. > > > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com> > > wrote: > > > > > > > > > > > > > - For users that need partition expansion of the input streams for > > > > stateful > > > > > job, they have a really big headache in the sense that Samza does > not > > > > allow > > > > > partition expansion for stateful job. SEP-5 addresses this headache > > for > > > > > them. > > > > > You are right that SEP-5 requires user to understand and enforce > > > > > limitations across organizations. But it is still much better than > > not > > > > > allowing user to expansion partition for stateful jobs at all, > right? > > > > Did I > > > > > miss something here? > > > > > > > > I guess this one is a matter of perspective. > > > > > > > > One argument is that if the system supports one case, it's better > than > > > none > > > > because there is one less scenario in which the system does the wrong > > > > thing. > > > > > > > > The counter argument is for uniform and consistent behavior, which is > > > easy > > > > for users to understand and properly leverage. > > > > > > > > Specifically, I'd argue that the current rule is very simple: "you > > cannot > > > > repartition inputs on a stateful job, so you must over-partition the > > > > initial implementation". To me, while that rule is not ideal, its > > > > simplicity is better that introducing a new solution that has a bunch > > of > > > > caveats, any one of which could be missed. If any one of the > > assumptions > > > in > > > > this SEP design are violated, the job would behave incorrectly. That > > > puts a > > > > lot more burden on the users than the simpler rule. > > > > > > > > > > I agree that we have different perspective here. It is true that user > > would > > > mess up their job if they used this feature in a wrong way, i.e. > violate > > > the assumption made in SEP-5. On the other hand, I think there is > always > > a > > > way for user to mess up their job if they configure the Samza job > > > incorrectly. I also think the assumption made in this SEP is not > > > particularly harder to understand than other existing configs in Samza. > > > > > > The answer to this can be subjective. I would love to hear perspective > > from > > > other developers on this issue. > > > > > > > > > > > > > > That's why I mentioned a few alternatives that, while more complex to > > > > implement, would provide a more consistent behavior with simple rules > > for > > > > the users. > > > > > > > > > > I am open to discuss alternative solutions that can address the the > > problem > > > in a better manner. I am not opposed to complexity as long as it gives > us > > > good long term benefits. > > > > > > Here are my current concern with the three alternatives you described > > > earlier: > > > > > > - The first alternative requires support from input system which is > > > currently not available. It will limit the usage of partition expansion > > to > > > only systems that support such interface. And it is not guaranteed that > > we > > > can persuade the developer of the input system to add this interface. > > This > > > is not desirable for Samza in the long term. > > > > > > - I can not comment on the second alternative because I don't > understand > > > how it reshuffles all existing changelog data. We can discuss more if > > there > > > is more specific detail. My gut feel is that this will be complex and > > > carries performance overhead. > > > > > > - The third alternative requires performance overhead. Given that user > > can > > > already use this solution to enable partition expansion, maybe Samza > > > developers can provide more input as to why we are not doing it by > > default. > > > My gut feel is that it carries considerable performance overhead and > > > increases the cost-to-serve Samze job (e.g. disk usage), which may make > > it > > > undesirable in the long term. > > > > > > > > > > > > > > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi > > > > > thFixedTaskNum > > > > > as well. If there is more grouper classes needed in the future, we > > can > > > > > solve this problem cleanly without new config. Given the > > > > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey > will > > > > throw > > > > > exception if and only if newGrouperClass is an instance of > > > > > previousGrouperClass. > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend > > > > > GroupBySystemStreamPartition > > > > > and GroupByPartitionWithFixedTaskNum should extend > GroupByPartition. > > > > Does > > > > > this address your concern? > > > > > > > > Sounds workable, thanks. > > > > > > > > > > > > > > Can > > > > > you be more specific why Partition-to-task mapping is not > meaningful > > > > > without > > > > > some definition of the key-to-partition assignments and why it is > > > > > incomplete and misleading? > > > > > > > > A partition is (in my naive interpretation) an independent queue for > > > > messages of a particular key set. It is not the *identity* of the > > > partition > > > > that determine the contents of the associated task's local state. > > Rather > > > it > > > > is the *contents* of the partition that affect the task's state. A > > > > partiton-to-task mapping only captures an identity relationship: > > > > partition1->task1. Without the assumptions of this SEP, this is > > > > insufficient to determine the assignment of keys to tasks, which is > > what > > > > really matters. Therefore, any future feature that utilizes this > > mapping > > > > without accounting for the assumptions of this SEP is likely to > > > > malfunction. > > > > > > > > > > > I am not sure it is true that "any future feature that utilizes this > > > mapping without accounting for the assumptions of this SEP is likely to > > > malfunction". Suppose we allow user to specify new-to-old-partition > > > mapping, then we can use the partition-to-task mapping correctly > without > > > replying on the assumption in this SEP, right? > > > > > > > > > > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > > > Hey Jacob, > > > > > > > > > > Thanks for the explanation. It seems that your biggest concern is > > with > > > > the > > > > > generality of the proposal. Let me try to address this and other > > > comments > > > > > below. > > > > > > > > > > 1) ... it will cause headaches for Samza users ... > > > > > > > > > > I am not sure I understand why this proposal causes headache for > > Samza > > > > > users. Here is the impact of the SEP-5 on users: > > > > > > > > > > - For users that do not need partition expansion of the input > stream, > > > > they > > > > > can use Samza without change change in code/binary/config. Thus > there > > > is > > > > no > > > > > headache for them. > > > > > > > > > > - For users that need partition expansion of the input streams for > > > > > stateless job, they currently need to manually reboot their Samza > job > > > in > > > > > order to let Samza consume the new partitions created for the > stream. > > > > SEP-5 > > > > > actually reduced their headache by allowing Samza to automatically > > > detect > > > > > and consume new partitions. > > > > > > > > > > - For users that need partition expansion of the input streams for > > > > stateful > > > > > job, they have a really big headache in the sense that Samza does > not > > > > allow > > > > > partition expansion for stateful job. SEP-5 addresses this headache > > for > > > > > them. > > > > > > > > > > You are right that SEP-5 requires user to understand and enforce > > > > > limitations across organizations. But it is still much better than > > not > > > > > allowing user to expansion partition for stateful jobs at all, > right? > > > > Did I > > > > > miss something here? > > > > > > > > > > 2) ... Separate orgs are often difficult to coordinate and a system > > > which > > > > > depends on such significant process/coordination is too fragile for > > my > > > > > taste .. > > > > > > > > > > This is true. Ideally we want a system that is fully self-serving. > I > > > > think > > > > > this is a long term goal for Samza. Still, for the reasons > described > > > > above, > > > > > I think something is better than nothing. I am open to alternative > > > design > > > > > that can support partition expansion for stateful jobs without > > > requiring > > > > > coordination. > > > > > > > > > > 3) There is currently no supported way of sharing state among the > > tasks > > > > of > > > > > a container. Each task has its own isolated store and that logical > > > > > isolation is the primary thing that enables Samza jobs to scale > with > > a > > > > > simple container count change. My feeling is that we should not > > change > > > > > this without > > > > > good reason. > > > > > > > > > > I see your point. I will remove this sentence from the motivation > > > > section. > > > > > This won't have any impact on the design of the SEP-5. Does this > > > address > > > > > the problem? > > > > > > > > > > 4) With the current proposal, we'd also need a similar check for > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any > > other > > > > > groupers > > > > > were later added with both these modes, we'd probably need to add > > those > > > > > too. It might be easier and cleaner to add a config to ignore that > > > check > > > > > temporarily. Down side is that it further complicates the Samza > > config, > > > > > which is already huge. Thoughts? > > > > > > > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi > > > > > thFixedTaskNum > > > > > as well. If there is more grouper classes needed in the future, we > > can > > > > > solve this problem cleanly without new config. Given the > > > > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey > will > > > > throw > > > > > exception if and only if newGrouperClass is an instance of > > > > > previousGrouperClass. > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend > > > > > GroupBySystemStreamPartition > > > > > and GroupByPartitionWithFixedTaskNum should extend > GroupByPartition. > > > > Does > > > > > this address your concern? > > > > > > > > > > 5) The task-to-container and container-to-host mappings are both > > > > meaningful > > > > > in context of the JobModel. Partition-to-task mapping is not > > meaningful > > > > > without > > > > > some definition of the key-to-partition assignments. It's > incomplete > > > > > information and therefore misleading. I think it only makes sense > to > > > use > > > > > this mapping if we adopt a solution wherein Samza also knows the > > > > partition > > > > > key assignment. > > > > > > > > > > Partition-to-task is currently explicitly passed from job > coordinator > > > to > > > > > each task as part of the job model to tell tasks which partitions > to > > > > > consume from. I think we can store some definition of the > > > > key-to-partition > > > > > assignments if Samza decides to get and use this information in the > > > > > future. Can > > > > > you be more specific why Partition-to-task mapping is not > meaningful > > > > > without > > > > > some definition of the key-to-partition assignments and why it is > > > > > incomplete and misleading? > > > > > > > > > > > > > > > Thanks, > > > > > Dong > > > > > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <jacob.m...@gmail.com> > > > > wrote: > > > > > > > > > > > Hey Dong, > > > > > > > > > > > > I'm opposed (or a +0, at best) to this limited, Kafka-specific > > > > solution. > > > > > I > > > > > > understand that the proposal is relatively simple to implement, > > but I > > > > > think > > > > > > it will cause headaches for Samza users. They will not only have > to > > > > > > understand all the limitations (increase only, double partitions > > > only, > > > > > > partition using hash+modulo, etc) of this approach, but enforcing > > > these > > > > > > limitations can be a major problem, especially when the Samza > jobs > > > and > > > > > > message brokers are managed by separate orgs in a company. > Separate > > > > orgs > > > > > > are often difficult to coordinate and a system which depends on > > such > > > > > > significant process/coordination is too fragile for my taste. > > > > > > > > > > > > That said, I realize that my opinion is just one of many in the > > > broader > > > > > > community which may feel differently, so let me respond to some > of > > > the > > > > > > other items in the discussion so we can clear them up: > > > > > > > > > > > > The task-to-container assignment matters because if the > correlated > > > > tasks > > > > > > > (i.e. tasks that consume messages with the same key) needs to > be > > in > > > > the > > > > > > > same container so that they can share the same key/value local > > > store > > > > on > > > > > > the > > > > > > > same physical machine. > > > > > > > > > > > > There is currently no supported way of sharing state among the > > tasks > > > > of a > > > > > > container. Each task has its own isolated store and that logical > > > > > isolation > > > > > > is the primary thing that enables Samza jobs to scale with a > simple > > > > > > container count change. My feeling is that we should not change > > this > > > > > > without good reason. > > > > > > > > > > > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala > > such > > > > > that > > > > > > > exception will not be thrown if new grouper is > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is > > > > GroupByPartition. > > > > > > Does > > > > > > > this look reasonable? > > > > > > > > > > > > With the current proposal, we'd also need a similar check for > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any > > > other > > > > > > groupers were later added with both these modes, we'd probably > need > > > to > > > > > add > > > > > > those too. It might be easier and cleaner to add a config to > ignore > > > > that > > > > > > check temporarily. Down side is that it further complicates the > > Samza > > > > > > config, which is already huge. Thoughts? > > > > > > > > > > > > I think storing the previous task-to-partition mapping is more > > > general > > > > > than > > > > > > > storing the partition count of all topics for the following > > > reasons: > > > > > > > - Samza already stores the task-to-container mapping and > > > > > > container-to-host > > > > > > > mapping in the coordinator stream. It seems consistent to also > > > store > > > > > the > > > > > > > partition-to-task mapping. And this information may be useful > for > > > > other > > > > > > > use-case such as debugging. > > > > > > > - By having the new interface take the previous > task-to-partition > > > > > > > assignment instead of a topic-to-partition-count mapping as new > > > > > > parameter, > > > > > > > we can potentially have grouper implementation to support other > > > types > > > > > of > > > > > > > input systems. > > > > > > > - It is sightly simpler to store the task-to-partition > assignment > > > > > because > > > > > > > we don't need to know whether this is the first time a job is > > > started > > > > > or > > > > > > > not. On the other hand, you can write topic-to-partition-count > > > > mapping > > > > > to > > > > > > > the coordinator stream only if this is the first time the job > is > > > run > > > > > > > > > > > > The task-to-container and container-to-host mappings are both > > > > meaningful > > > > > in > > > > > > context of the JobModel. Partition-to-task mapping is not > > meaningful > > > > > > without some definition of the key-to-partition assignments. It's > > > > > > incomplete information and therefore misleading. I think it only > > > makes > > > > > > sense to use this mapping if we adopt a solution wherein Samza > also > > > > knows > > > > > > the partition key assignment. > > > > > > > > > > > > -Jake > > > > > > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hey Jacob, > > > > > > > > > > > > > > Thanks for taking time to review the SEP. > > > > > > > > > > > > > > I agree with you and Navina that the current SEP doesn't > provide > > > > > support > > > > > > to > > > > > > > arbitrary input systems and it doesn't support partition > shrink. > > I > > > > > think > > > > > > > the scope of this SEP is to support partition expansion for > Kafka > > > > (the > > > > > > most > > > > > > > widely used input system of Samza) and keep the door open for > > > > partition > > > > > > > support of various input systems. The current design can > support > > > any > > > > > > system > > > > > > > that meets the two operational requirement specified in the > doc. > > > > > > > > > > > > > > While it is possible to support more types of input systems, it > > > will > > > > > > likely > > > > > > > add more complexity to the design. For example, the first > > > alternative > > > > > > > solution from you requires broker-side support to negotiate > hash > > > > > > algorithm. > > > > > > > The second alternative solution requires changelog partition > > > > reshuffle > > > > > > > which carries its own design complexity and performance > overhead. > > > > There > > > > > > is > > > > > > > tradeoff between the generality and the complexity among these > > > > > choices. I > > > > > > > like the current design because it is simple and addresses a > big > > > > usage > > > > > > > scenario for us. We can add more complexity to generalize the > > > design > > > > if > > > > > > it > > > > > > > enables important use-case. Does this sound reasonable? > > > > > > > > > > > > > > Note that the "Rejected Alternative" section also mentions the > > > > > > possibility > > > > > > > of supporting a wider range of input systems by allowing user > to > > > > > specify > > > > > > > the new-partition to old-partition mapping. We are not doing it > > > > because > > > > > > 1) > > > > > > > we may have better understanding of the design after we have a > > > > specific > > > > > > > second input system to support 2) the current design can be > > > extended > > > > to > > > > > > > support general input systems. I think similar argument can be > > > > applied > > > > > > > explain why we don't have to support general input systems > using > > > the > > > > > > > potentially-good alternatives you mentioned. > > > > > > > > > > > > > > I hope SEP-5 can be an important first-step towards supporting > > > > > partition > > > > > > > expansion of any input system. > > > > > > > > > > > > > > To answer your questions about the current proposal: > > > > > > > > > > > > > > >1. "An alternative solution is to allow task number to > increase > > > > after > > > > > > > >partition expansion and uses a proper task-to-container > > assignment > > > > to > > > > > > make > > > > > > > >sure the Samza output is correct." What does the container > have > > to > > > > do > > > > > > with > > > > > > > >stateful processing or output in general? > > > > > > > > > > > > > > The task-to-container assignment matters because if the > > correlated > > > > > tasks > > > > > > > (i.e. tasks that consume messages with the same key) needs to > be > > in > > > > the > > > > > > > same container so that they can share the same key/value local > > > store > > > > on > > > > > > the > > > > > > > same physical machine. > > > > > > > > > > > > > > >2. When you use "Join" as an example, you basically mean > > multiple > > > > > > > >co-partitioned streams, right? This is opposed to multiple, > > > > > > > >independently-partitioned streams or a single stream. Would be > > > nice > > > > to > > > > > > > >formulate the proposal in these more general terms. > > > > > > > > > > > > > > I thought "join" is a commonly used to refer to the join > > opeartion > > > > with > > > > > > > co-partitioned stream but I may be wrong. I have updated the > wiki > > > to > > > > > > > explicitly mention "co-partitioned stream". Does this look > better > > > > now? > > > > > > > > > > > > > > >3. When switching SSP groupers, how will the users avoid the > > > > > > > >org.apache.samza.checkpoint.kafka. > > DifferingSystemStreamPartition > > > > > > > GrouperFactoryValues > > > > > > > >exception? > > > > > > > > > > > > > > I think we can hardcode new logic in > KafkaCheckpointLogKey.scala > > > such > > > > > > that > > > > > > > exception will not be thrown if new grouper is > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is > > > > GroupByPartition. > > > > > > Does > > > > > > > this look reasonable? > > > > > > > > > > > > > > >4. Partition to task assignment is meaningless without key to > > > > > partition > > > > > > > >mapping. The real semantics are captured in the external > > > requirement > > > > > for > > > > > > > >partitioning via hash+modulo. But in that case, iiuc, only the > > > > > partition > > > > > > > >count matters. So why not just store the original partition > > count > > > > > rather > > > > > > > >than the whole mapping? > > > > > > > > > > > > > > I think storing the previous task-to-partition mapping is more > > > > general > > > > > > than > > > > > > > storing the partition count of all topics for the following > > > reasons: > > > > > > > > > > > > > > - Samza already stores the task-to-container mapping and > > > > > > container-to-host > > > > > > > mapping in the coordinator stream. It seems consistent to also > > > store > > > > > the > > > > > > > partition-to-task mapping. And this information may be useful > for > > > > other > > > > > > > use-case such as debugging. > > > > > > > > > > > > > > - By having the new interface take the previous > task-to-partition > > > > > > > assignment instead of a topic-to-partition-count mapping as new > > > > > > parameter, > > > > > > > we can potentially have grouper implementation to support other > > > types > > > > > of > > > > > > > input systems. > > > > > > > > > > > > > > - It is sightly simpler to store the task-to-partition > assignment > > > > > because > > > > > > > we don't need to know whether this is the first time a job is > > > started > > > > > or > > > > > > > not. On the other hand, you can write topic-to-partition-count > > > > mapping > > > > > to > > > > > > > the coordinator stream only if this is the first time the job > is > > > run > > > > > > > > > > > > > > Thanks, > > > > > > > Dong > > > > > > > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes < > > jacob.m...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > Hey Dong, > > > > > > > > > > > > > > > > Thanks for the SEP. Supporting partition changes is > critically > > > > > > important > > > > > > > > for stateful Samza jobs, so it's great to see some ideas on > > that > > > > > front! > > > > > > > > > > > > > > > > Sorry for the late feedback, but I have a few thoughts to > > > > contribute. > > > > > > > > > > > > > > > > Big +1 on Navina's comment: > > > > > > > > > > > > > > > > > My biggest gripe with this SEP is that it seems like a > > > > tailor-made > > > > > > > > > solution > > > > > > > > > that relies on the semantics of the Kafka system and yet, > we > > > are > > > > > > trying > > > > > > > > to > > > > > > > > > masquerade that as operational requirements for other > systems > > > > > > > interacting > > > > > > > > > with Samza. (Not to say that this is the first time such a > > > choice > > > > > is > > > > > > > > being > > > > > > > > > made in the Samza design). I am not seeing how this can a > > > > "general" > > > > > > > > > solution for all input systems. That's my two cents. I > would > > > like > > > > > to > > > > > > > hear > > > > > > > > > alternative points of view for this from other devs. > > > > > > > > > > > > > > > > > > > > > > > > Two examples of this: > > > > > > > > 1. This is mostly a hypothetical, but some message brokers > may > > > use > > > > > key > > > > > > > > range assignment rather than hash+modulo. > > > > > > > > 2. Kafka can't reduce the number of partitions, but it can > > happen > > > > on > > > > > > > other > > > > > > > > systems. For example, it may be cheaper to reduce the number > of > > > > > > > partitions > > > > > > > > on a hosted service where the cost model depends on the > number > > of > > > > > > > > partitions/shards. > > > > > > > > > > > > > > > > It seems to me that a solution which doesn't depend on > > partition > > > > key > > > > > > > > assignment in the message broker. Here are a few alternatives > > > that > > > > > > > weren't > > > > > > > > discussed and I think should be considered: > > > > > > > > > > > > > > > > Alternatives in order of increasing preference: > > > > > > > > 1. Samza manages the partition hash (via some new contract > with > > > the > > > > > > > > brokers) and guarantees correct routing of keys among the new > > > > > > partitions. > > > > > > > > 2. Samza detects a task count change, creates a new changelog > > > with > > > > > > > correct > > > > > > > > partitions, and *somehow* reshuffles all existing changelog > > data > > > > into > > > > > > the > > > > > > > > new topic and then uses the new topic from then on. (doesn't > > work > > > > > > without > > > > > > > > changelog, but in that case durability isn't paramount, so we > > can > > > > > just > > > > > > > > wipe) > > > > > > > > 3. Use RPC in between stages and samza fully manages key > > > assignment > > > > > > among > > > > > > > > tasks. No on-disk topic data to clean up. Mandatory > > > repartitioning > > > > in > > > > > > the > > > > > > > > first stage to pre-scaled tasks in next stage. > > > > > > > > 4. Combined 2-3 solution > > > > > > > > > > > > > > > > Finally, some questions about the current proposal: > > > > > > > > 1. "An alternative solution is to allow task number to > increase > > > > after > > > > > > > > partition expansion and uses a proper task-to-container > > > assignment > > > > to > > > > > > > make > > > > > > > > sure the Samza output is correct." What does the container > have > > > to > > > > do > > > > > > > with > > > > > > > > stateful processing or output in general? > > > > > > > > 2. When you use "Join" as an example, you basically mean > > multiple > > > > > > > > co-partitioned streams, right? This is opposed to multiple, > > > > > > > > independently-partitioned streams or a single stream. Would > be > > > nice > > > > > to > > > > > > > > formulate the proposal in these more general terms. > > > > > > > > 3. When switching SSP groupers, how will the users avoid the > > > > > > > > org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti > > > > > > > > tionGrouperFactoryValues > > > > > > > > exception? > > > > > > > > 4. Partition to task assignment is meaningless without key to > > > > > partition > > > > > > > > mapping. The real semantics are captured in the external > > > > requirement > > > > > > for > > > > > > > > partitioning via hash+modulo. But in that case, iiuc, only > the > > > > > > partition > > > > > > > > count matters. So why not just store the original partition > > count > > > > > > rather > > > > > > > > than the whole mapping? > > > > > > > > > > > > > > > > -Jake > > > > > > > > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin < > lindon...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hey Yi, Navina, > > > > > > > > > > > > > > > > > > I have updated the SEP-5 document based on our discussion. > > The > > > > > > > difference > > > > > > > > > can be found here > > > > > > > > > <https://cwiki.apache.org/confluence/pages/ > > > > > diffpagesbyversion.action > > > > > > ? > > > > > > > > > pageId=70255476&selectedPageVersions=14& > selectedPageVersions > > > > =15>. > > > > > > > > > Here is the summary of changes: > > > > > > > > > > > > > > > > > > - Add new interface that extends the existing interface > > > > > > > > > SystemStreamPartitionGrouper. Newly-added grouper class > > should > > > > > > > implement > > > > > > > > > this interface. > > > > > > > > > - Explained in the Rejected Alternative Section why we > don't > > > add > > > > > new > > > > > > > > method > > > > > > > > > in the existing interface > > > > > > > > > - Explained in the Rejected Alternative Section why we > don't > > > > > > > config/class > > > > > > > > > for user to specify new-partition to old-partition mapping. > > > > > > > > > > > > > > > > > > Can you take another look at the proposal and let me know > if > > > > there > > > > > is > > > > > > > any > > > > > > > > > concern? > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin < > > lindon...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey Yi, > > > > > > > > > > > > > > > > > > > > Thanks much for the comment. I have updated the doc to > > > address > > > > > all > > > > > > > your > > > > > > > > > > comments except the one related to the interface. I am > not > > > > sure I > > > > > > > > > > understand your suggestion of the new interface. Will > > discuss > > > > > > > tomorrow. > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > Dong > > > > > > > > > > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan < > > nickpa...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > >> Hi, Don, > > > > > > > > > >> > > > > > > > > > >> Thanks for the detailed design doc for a long-waited > > feature > > > > in > > > > > > > Samza! > > > > > > > > > >> Really appreciate it! I did a quick pass and have the > > > > following > > > > > > > > > comments: > > > > > > > > > >> > > > > > > > > > >> - minor: "limit the maximum size of partition" ==> > "limit > > > the > > > > > > > maximum > > > > > > > > > size > > > > > > > > > >> of each partition" > > > > > > > > > >> - "However, Samza currently is not able to handle > > partition > > > > > > > expansion > > > > > > > > of > > > > > > > > > >> the input streams"==>better point out "for stateful > jobs". > > > For > > > > > > > > stateless > > > > > > > > > >> jobs, simply bouncing the job now can pick up the new > > > > > partitions. > > > > > > > > > >> - "it is possible (e.g. with Kafka) that messages with a > > > given > > > > > key > > > > > > > > > exists > > > > > > > > > >> in both partition 1 an 3. Because GroupByPartition will > > > assign > > > > > > > > > partition 1 > > > > > > > > > >> and 3 to different tasks, messages with the same key may > > be > > > > > > handled > > > > > > > by > > > > > > > > > >> different task/container/process and their state will be > > > > stored > > > > > in > > > > > > > > > >> different changelog partition." The problem statement is > > not > > > > > super > > > > > > > > clear > > > > > > > > > >> here. The issues with stateful jobs is: after > > > GroupByPartition > > > > > > > assign > > > > > > > > > >> partition 1 and 3 to different tasks, the new task > > handling > > > > > > > partition > > > > > > > > 3 > > > > > > > > > >> does not have the previous state to resume the work. > e.g. > > a > > > > > > page-key > > > > > > > > > based > > > > > > > > > >> counter would start from 0 in the new task for a > specific > > > key, > > > > > > > instead > > > > > > > > > of > > > > > > > > > >> resuming the previous count 50 held by task 1. > > > > > > > > > >> - minor rewording: "the first solution in this doc" ==> > > "the > > > > > > > solution > > > > > > > > > >> proposed in this doc" > > > > > > > > > >> - "Thus additional development work is needed in Kafka > to > > > meet > > > > > > this > > > > > > > > > >> requirement" It would be good to link to a KIP if and > when > > > it > > > > > > exists > > > > > > > > > >> - Instead of touching/deprecating the interface > > > > > > > > > >> SystemStreamPartitionGrouper, I would recommend to have > a > > > > > > different > > > > > > > > > >> implementation class of the interface, which in the > > > > constructor > > > > > of > > > > > > > the > > > > > > > > > >> grouper, takes two parameters: a) the previous task > number > > > > read > > > > > > from > > > > > > > > the > > > > > > > > > >> coordinator stream; b) the configured new-partition to > > > > > > old-partition > > > > > > > > > >> mapping policy. Then, the grouper's interface method > stays > > > the > > > > > > same > > > > > > > > and > > > > > > > > > >> the > > > > > > > > > >> behavior of the grouper is more configurable which is > good > > > to > > > > > > > support > > > > > > > > a > > > > > > > > > >> broader set of use cases in addition to Kafka's built-in > > > > > partition > > > > > > > > > >> expansion policies. > > > > > > > > > >> - Minor renaming suggestion to the new grouper class > > names: > > > > > > > > > >> GroupByPartitionWithFixedTaskNum > > > > > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum > > > > > > > > > >> > > > > > > > > > >> Thanks! > > > > > > > > > >> > > > > > > > > > >> - Yi > > > > > > > > > >> > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin < > > > > lindon...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > >> > > > > > > > > > >> > Hey Navina, > > > > > > > > > >> > > > > > > > > > > >> > Thanks much for the comment. Please see my response > > below. > > > > > > > > > >> > > > > > > > > > > >> > Regarding your biggest gripe with the SEP, I > personally > > > > think > > > > > > the > > > > > > > > > >> > operational requirement proposed in the KIP are pretty > > > > general > > > > > > and > > > > > > > > > >> could be > > > > > > > > > >> > easily enforced by other systems. The reason is that > the > > > > > module > > > > > > > > > >> operation > > > > > > > > > >> > is pretty standard and the default option when we > choose > > > > > > > partition. > > > > > > > > > And > > > > > > > > > >> > usually the underlying system allows user to select > > > > arbitrary > > > > > > > > > partition > > > > > > > > > >> > number if it supports partition expansion. Do you know > > any > > > > > > system > > > > > > > > that > > > > > > > > > >> does > > > > > > > > > >> > not meet these two requirement? > > > > > > > > > >> > > > > > > > > > > >> > Regarding your comment of the Motivation section, I > > > renamed > > > > > the > > > > > > > > first > > > > > > > > > >> > section as "Problem and Goal" and specified that "*The > > > goal > > > > of > > > > > > > this > > > > > > > > > >> > proposal is to enable partition expansion of the input > > > > > > > streams*.". I > > > > > > > > > >> also > > > > > > > > > >> > put a sentence at the end of the Motivation section > that > > > > "*The > > > > > > > > feature > > > > > > > > > >> of > > > > > > > > > >> > task expansion is out of the scope of this proposal > and > > > will > > > > > be > > > > > > > > > >> addressed > > > > > > > > > >> > in a future SEP*". The second paragraph in the > > Motivation > > > > > > section > > > > > > > is > > > > > > > > > >> mainly > > > > > > > > > >> > used to explain the thinking process that we have gone > > > > > through, > > > > > > > what > > > > > > > > > >> other > > > > > > > > > >> > alternative we have considered, and we plan to do in > > Samza > > > > in > > > > > > the > > > > > > > > nex > > > > > > > > > >> step. > > > > > > > > > >> > > > > > > > > > > >> > To answer your question why increasing the partition > > > number > > > > > will > > > > > > > > > >> increase > > > > > > > > > >> > the throughput of the kafka consumer in the container, > > > Kafka > > > > > > > > consumer > > > > > > > > > >> can > > > > > > > > > >> > potentially fetch more data in one FetchResponse with > > more > > > > > > > > partitions > > > > > > > > > in > > > > > > > > > >> > the FetchRequest. This is because we limit the maximum > > > > amount > > > > > of > > > > > > > > data > > > > > > > > > >> that > > > > > > > > > >> > can be fetch for a given partition in the > FetchResponse. > > > > This > > > > > by > > > > > > > > > >> default is > > > > > > > > > >> > set to 1 MB. And there is reason that we can not > > > arbitrarily > > > > > > bump > > > > > > > up > > > > > > > > > >> this > > > > > > > > > >> > limit. > > > > > > > > > >> > > > > > > > > > > >> > To answer your question how partition expansion in > Kafka > > > > > impacts > > > > > > > the > > > > > > > > > >> > clients, Kafka consumer is able to automatically > detect > > > new > > > > > > > > partition > > > > > > > > > of > > > > > > > > > >> > the topic and reassign all (both old and new) > partitions > > > > > across > > > > > > > > > >> consumers > > > > > > > > > >> > in the consumer group IF you tell consumer the topic > to > > be > > > > > > > > subscribed. > > > > > > > > > >> But > > > > > > > > > >> > consumer in Samza's container uses another way of > > > > > subscription. > > > > > > > > > Instead > > > > > > > > > >> of > > > > > > > > > >> > subscribing to the topic, the consumer in Samza's > > > container > > > > > > > > subscribes > > > > > > > > > >> to > > > > > > > > > >> > the specific partitions of the topic. In this case, if > > new > > > > > > > > partitions > > > > > > > > > >> have > > > > > > > > > >> > been added, Samza will need to explicitly subscribe to > > the > > > > new > > > > > > > > > >> partitions > > > > > > > > > >> > of the topic. The "Handle partition expansion while > > tasks > > > > are > > > > > > > > running" > > > > > > > > > >> > section in the SEP addresses this issue in Samza -- it > > > > > > > recalculates > > > > > > > > > the > > > > > > > > > >> job > > > > > > > > > >> > model and restart container so that consumer can > > subscribe > > > > to > > > > > > the > > > > > > > > new > > > > > > > > > >> > partitions. > > > > > > > > > >> > > > > > > > > > > >> > I will ask other dev to take a look at the proposal. I > > > will > > > > > > start > > > > > > > > the > > > > > > > > > >> > voting thread tomorrow if there is no further concern > > with > > > > the > > > > > > > SEP. > > > > > > > > > >> > > > > > > > > > > >> > Thanks! > > > > > > > > > >> > Dong > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh > > (Apache) < > > > > > > > > > >> > nav...@apache.org> > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > >> > > Hey Dong, > > > > > > > > > >> > > > > > > > > > > > >> > > > I have updated the motivation section to clarify > > > this. > > > > > > > > > >> > > > > > > > > > > > >> > > Thanks for updating the motivation. Couple of notes > > > here: > > > > > > > > > >> > > > > > > > > > > > >> > > 1. > > > > > > > > > >> > > > "The motivation of increasing partition number of > > > Kafka > > > > > > topic > > > > > > > > > >> includes > > > > > > > > > >> > 1) > > > > > > > > > >> > > limit the maximum size of a partition in order to > > > improve > > > > > > broker > > > > > > > > > >> > > performance and 2) increase throughput of Kafka > > consumer > > > > in > > > > > > the > > > > > > > > > Samza > > > > > > > > > >> > > container." > > > > > > > > > >> > > > > > > > > > > > >> > > It's unclear to me how increasing the partition > number > > > > will > > > > > > > > increase > > > > > > > > > >> the > > > > > > > > > >> > > throughput of the kafka consumer in the container? > > > > > > > Theoretically, > > > > > > > > > you > > > > > > > > > >> > will > > > > > > > > > >> > > still be consuming the same amount of data in the > > > > container, > > > > > > > > > >> irrespective > > > > > > > > > >> > > of whether it is coming from one partition or more > > than > > > > one > > > > > > > > expanded > > > > > > > > > >> > > partitions. Can you please explain it for me here, > > what > > > > you > > > > > > mean > > > > > > > > by > > > > > > > > > >> that? > > > > > > > > > >> > > > > > > > > > > > >> > > 2. I believe the second paragraph under motivation > is > > > > simply > > > > > > > > talking > > > > > > > > > >> > about > > > > > > > > > >> > > the scope of the current SEP. It will be easier to > > read > > > if > > > > > > what > > > > > > > > > >> solution > > > > > > > > > >> > is > > > > > > > > > >> > > included in this SEP and what is left out as not in > > > scope. > > > > > > (for > > > > > > > > > >> example, > > > > > > > > > >> > > expansions for stateful jobs is supported or not). > > > > > > > > > >> > > > > > > > > > > > >> > > > We need to persist the task-to-sspList mapping in > > the > > > > > > > > > >> > > coordinator stream so that the job can derive the > > > original > > > > > > > number > > > > > > > > of > > > > > > > > > >> > > partitions of each input stream regardless of how > many > > > > times > > > > > > the > > > > > > > > > >> > partition > > > > > > > > > >> > > has expanded. Does this make sense? > > > > > > > > > >> > > > > > > > > > > > >> > > Yes. It does! > > > > > > > > > >> > > > > > > > > > > > >> > > > I am not sure how this is related to the locality > > > > though. > > > > > > Can > > > > > > > > you > > > > > > > > > >> > clarify > > > > > > > > > >> > > your question if I haven't answered your question? > > > > > > > > > >> > > > > > > > > > > > >> > > It's not related. I just meant to give an example of > > yet > > > > > > another > > > > > > > > > >> > > coordinator message that is persisted. Your > > ssp-to-task > > > > > > mapping > > > > > > > is > > > > > > > > > >> > > following a similar pattern for persisting. Just > > wanted > > > to > > > > > > > clarify > > > > > > > > > >> that. > > > > > > > > > >> > > > > > > > > > > > >> > > > Can you let me know if this, together with the > > answers > > > > in > > > > > > the > > > > > > > > > >> previous > > > > > > > > > >> > > email, addresses all your questions? > > > > > > > > > >> > > > > > > > > > > > >> > > Yes. I believe you have addressed most of my > > questions. > > > > > Thanks > > > > > > > for > > > > > > > > > >> taking > > > > > > > > > >> > > time to do that. > > > > > > > > > >> > > > > > > > > > > > >> > > > Is there specific question you have regarding > > > partition > > > > > > > > > >> > > expansion in Kafka? > > > > > > > > > >> > > > > > > > > > > > >> > > I guess my questions are on how partition expansion > in > > > > Kafka > > > > > > > > impacts > > > > > > > > > >> the > > > > > > > > > >> > > clients. Iiuc, partition expansions are done > manually > > in > > > > > Kafka > > > > > > > > based > > > > > > > > > >> on > > > > > > > > > >> > the > > > > > > > > > >> > > bytes-in rate of the partition. Do the existing > kafka > > > > > clients > > > > > > > > handle > > > > > > > > > >> this > > > > > > > > > >> > > expansion automatically? if yes, how does it work? > If > > > not, > > > > > are > > > > > > > > there > > > > > > > > > >> > plans > > > > > > > > > >> > > to support it in the future? > > > > > > > > > >> > > > > > > > > > > > >> > > > Thus user's job should not need to bootstrap > > key/value > > > > > store > > > > > > > > from > > > > > > > > > >> the > > > > > > > > > >> > > changelog topic. > > > > > > > > > >> > > > > > > > > > > > >> > > Why is this discussion relevant here? Key/value > store > > / > > > > > > > changelog > > > > > > > > > >> topic > > > > > > > > > >> > > partition is scoped with the context of a task. > Since > > we > > > > are > > > > > > not > > > > > > > > > >> changing > > > > > > > > > >> > > the number of tasks, I don't think it is required to > > > > mention > > > > > > it > > > > > > > > > here. > > > > > > > > > >> > > > > > > > > > > > >> > > > The new method takes the > > SystemStreamPartition-to-Task > > > > > > > > assignment > > > > > > > > > >> from > > > > > > > > > >> > > the previous job model which can be read from the > > > > > coordinator > > > > > > > > > stream. > > > > > > > > > >> > > > > > > > > > > > >> > > Jobmodel is currently not persisted to coordinator > > > stream. > > > > > In > > > > > > > your > > > > > > > > > >> > design, > > > > > > > > > >> > > you talk about writing separate coordinator messages > > for > > > > > > > > ssp-to-task > > > > > > > > > >> > > assignments. Hence, please correct this statement. > It > > is > > > > > kind > > > > > > of > > > > > > > > > >> > misleading > > > > > > > > > >> > > to the reader. > > > > > > > > > >> > > > > > > > > > > > >> > > My biggest gripe with this SEP is that it seems > like a > > > > > > > tailor-made > > > > > > > > > >> > solution > > > > > > > > > >> > > that relies on the semantics of the Kafka system and > > > yet, > > > > we > > > > > > are > > > > > > > > > >> trying > > > > > > > > > >> > to > > > > > > > > > >> > > masquerade that as operational requirements for > other > > > > > systems > > > > > > > > > >> interacting > > > > > > > > > >> > > with Samza. (Not to say that this is the first time > > > such a > > > > > > > choice > > > > > > > > is > > > > > > > > > >> > being > > > > > > > > > >> > > made in the Samza design). I am not seeing how this > > can > > > a > > > > > > > > "general" > > > > > > > > > >> > > solution for all input systems. That's my two > cents. I > > > > would > > > > > > > like > > > > > > > > to > > > > > > > > > >> hear > > > > > > > > > >> > > alternative points of view for this from other devs. > > > > > > > > > >> > > > > > > > > > > > >> > > Please make sure you have enough eyes on this SEP. > If > > > you > > > > > do, > > > > > > > > please > > > > > > > > > >> > start > > > > > > > > > >> > > a VOTE thread to approve this SEP. > > > > > > > > > >> > > > > > > > > > > > >> > > Thanks! > > > > > > > > > >> > > Navina > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin < > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > >> wrote: > > > > > > > > > >> > > > > > > > > > > > >> > > > Hey Navina, > > > > > > > > > >> > > > > > > > > > > > > >> > > > I have updated the wiki based on your suggestion. > > More > > > > > > > > > >> specifically, I > > > > > > > > > >> > > have > > > > > > > > > >> > > > made the following changes: > > > > > > > > > >> > > > > > > > > > > > > >> > > > - Improved Problem section and Motivation section > to > > > > > > describe > > > > > > > > why > > > > > > > > > we > > > > > > > > > >> > use > > > > > > > > > >> > > > the solution in this proposal instead of tackling > > the > > > > > > problem > > > > > > > of > > > > > > > > > >> task > > > > > > > > > >> > > > expansion directly. > > > > > > > > > >> > > > > > > > > > > > > >> > > > - Illustrate the design in a way that doesn't bind > > to > > > > > Kafka. > > > > > > > > Kafka > > > > > > > > > >> is > > > > > > > > > >> > > only > > > > > > > > > >> > > > used as example to illustrate why we want to > expand > > > > > > partition > > > > > > > > > >> expansion > > > > > > > > > >> > > and > > > > > > > > > >> > > > whether the operational requirement can be > supported > > > > when > > > > > > > Kafka > > > > > > > > is > > > > > > > > > >> used > > > > > > > > > >> > > as > > > > > > > > > >> > > > the input system. Note that the proposed solution > > > should > > > > > > work > > > > > > > > for > > > > > > > > > >> any > > > > > > > > > >> > > input > > > > > > > > > >> > > > system that meets the operational requirement > > > described > > > > in > > > > > > the > > > > > > > > > wiki. > > > > > > > > > >> > > > > > > > > > > > > >> > > > - Fixed the problem in the figure. > > > > > > > > > >> > > > > > > > > > > > > >> > > > - Added a new class GroupBySystemStreamPartitionFi > > > > > > xedTaskNum > > > > > > > to > > > > > > > > > the > > > > > > > > > >> > > wiki. > > > > > > > > > >> > > > Together with GroupByPartitionFixedTaskNum, it > > should > > > > > ensure > > > > > > > > that > > > > > > > > > we > > > > > > > > > >> > > have a > > > > > > > > > >> > > > solution to enable partition expansion for all > users > > > > that > > > > > > are > > > > > > > > > using > > > > > > > > > >> > > > pre-defined grouper in Samza. Note that those > users > > > who > > > > > use > > > > > > > > custom > > > > > > > > > >> > > grouper > > > > > > > > > >> > > > would need to update their implementation. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Can you let me know if this, together with the > > answers > > > > in > > > > > > the > > > > > > > > > >> previous > > > > > > > > > >> > > > email, addresses all your questions? Thanks for > > taking > > > > > time > > > > > > to > > > > > > > > > >> review > > > > > > > > > >> > the > > > > > > > > > >> > > > proposal. > > > > > > > > > >> > > > > > > > > > > > > >> > > > Regards, > > > > > > > > > >> > > > Dong > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin < > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > >> > wrote: > > > > > > > > > >> > > > > > > > > > > > > >> > > > > Hey Navina, > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Thanks much for your comments. Please see my > reply > > > > > inline. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh > > > > > (Apache) < > > > > > > > > > >> > > > > nav...@apache.org> wrote: > > > > > > > > > >> > > > > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of > > > > questions > > > > > to > > > > > > > > > >> understand > > > > > > > > > >> > > > your > > > > > > > > > >> > > > >> proposal better: > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> * Under motivation, you mention that "_We > expect > > > this > > > > > > > > solution > > > > > > > > > to > > > > > > > > > >> > work > > > > > > > > > >> > > > >> similarly with other input system as well._", > > yet I > > > > > don't > > > > > > > see > > > > > > > > > any > > > > > > > > > >> > > > >> discussion on how it will work with other input > > > > > systems. > > > > > > > That > > > > > > > > > is, > > > > > > > > > >> > what > > > > > > > > > >> > > > >> kind > > > > > > > > > >> > > > >> of contract does samza expect from other input > > > > systems > > > > > ? > > > > > > If > > > > > > > > we > > > > > > > > > >> are > > > > > > > > > >> > not > > > > > > > > > >> > > > >> planning to provide a generic solution, it > might > > be > > > > > worth > > > > > > > > > >> calling it > > > > > > > > > >> > > out > > > > > > > > > >> > > > >> in > > > > > > > > > >> > > > >> the SEP. > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > I think the contract we expect from other > systems > > > are > > > > > > > exactly > > > > > > > > > the > > > > > > > > > >> > > > > operational requirement mentioned in the SEP, > i.e. > > > > > > > partitions > > > > > > > > > >> should > > > > > > > > > >> > > > always > > > > > > > > > >> > > > > be doubled and the hash algorithm should module > > the > > > > > number > > > > > > > of > > > > > > > > > >> > > partitions. > > > > > > > > > >> > > > > SEP-5 should also allow partition expansion of > all > > > > input > > > > > > > > systems > > > > > > > > > >> that > > > > > > > > > >> > > > meet > > > > > > > > > >> > > > > these two requirements. I have updated the > > > motivation > > > > > > > section > > > > > > > > to > > > > > > > > > >> > > clarify > > > > > > > > > >> > > > > this. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> * I understand the partition mapping logic you > > have > > > > > > > proposed. > > > > > > > > > >> But I > > > > > > > > > >> > > > think > > > > > > > > > >> > > > >> the example explanation doesn't match the > > diagram. > > > In > > > > > the > > > > > > > > > >> diagram, > > > > > > > > > >> > > after > > > > > > > > > >> > > > >> expansion, partiion-0 and partition-1 are > > pointing > > > to > > > > > > > bucket > > > > > > > > 0 > > > > > > > > > >> and > > > > > > > > > >> > > > >> partition-3 and partition-4 are pointing to > > bucket > > > > 1. I > > > > > > > think > > > > > > > > > the > > > > > > > > > >> > > former > > > > > > > > > >> > > > >> has to be partition-0 and partition-2 and the > > > latter, > > > > > is > > > > > > > > > >> partition-1 > > > > > > > > > >> > > and > > > > > > > > > >> > > > >> partition-3. If I am wrong, please help me > > > understand > > > > > the > > > > > > > > logic > > > > > > > > > >> :) > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Good catch. I will update the figure to fix this > > > > > problem. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> * I don't know how partition expansion in Kafka > > > > works. > > > > > I > > > > > > am > > > > > > > > > >> familiar > > > > > > > > > >> > > > with > > > > > > > > > >> > > > >> how shard splitting happens in Kinesis - there > is > > > > > > > > hierarchical > > > > > > > > > >> > > relation > > > > > > > > > >> > > > >> between the parent and child shards. This way, > it > > > > will > > > > > > also > > > > > > > > > allow > > > > > > > > > >> > the > > > > > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only > > supports > > > > > > > partition > > > > > > > > > >> > > > "expansion", > > > > > > > > > >> > > > >> as opposed to "splits". Can you provide some > > > context > > > > or > > > > > > > link > > > > > > > > > >> related > > > > > > > > > >> > > to > > > > > > > > > >> > > > >> how > > > > > > > > > >> > > > >> partition expansion works in Kafka? > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > I couldn't find any wiki on partition expansion > in > > > > > Kafka. > > > > > > > The > > > > > > > > > >> > partition > > > > > > > > > >> > > > > expansion logic in Kafka is very simply -- it > > simply > > > > > adds > > > > > > > new > > > > > > > > > >> > partition > > > > > > > > > >> > > > to > > > > > > > > > >> > > > > the existing topic. Is there specific question > you > > > > have > > > > > > > > > regarding > > > > > > > > > >> > > > partition > > > > > > > > > >> > > > > expansion in Kafka? > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> * Are you only recommending that expansion can > be > > > > > > supported > > > > > > > > for > > > > > > > > > >> > samza > > > > > > > > > >> > > > jobs > > > > > > > > > >> > > > >> that use Kafka as input systems **and** > configure > > > the > > > > > > > > > SSPGrouper > > > > > > > > > >> as > > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like > > > this > > > > > only > > > > > > > > > applies > > > > > > > > > >> > for > > > > > > > > > >> > > > >> GroupByPartition. Please correct me if I am > > wrong. > > > > What > > > > > > is > > > > > > > > the > > > > > > > > > >> > > > expectation > > > > > > > > > >> > > > >> for custom SSP Groupers? > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > The expansion can be supported for Samza jobs if > > the > > > > > input > > > > > > > > > system > > > > > > > > > >> > meets > > > > > > > > > >> > > > > the operational requirement mentioned above. It > > > > doesn't > > > > > > have > > > > > > > > to > > > > > > > > > >> use > > > > > > > > > >> > > Kafka > > > > > > > > > >> > > > > as input system. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > The current proposal provided solution for jobs > > that > > > > > > > currently > > > > > > > > > use > > > > > > > > > >> > > > > GroupByPartition. The proposal can be extended > to > > > > > support > > > > > > > jobs > > > > > > > > > >> that > > > > > > > > > >> > use > > > > > > > > > >> > > > > other grouper that are pre-defined in Samza. The > > > > custom > > > > > > SSP > > > > > > > > > >> grouper > > > > > > > > > >> > > needs > > > > > > > > > >> > > > > to handle partition expansion similar to how > > > > > > > > > >> > > GroupByPartitionFixedTaskNum > > > > > > > > > >> > > > > handles it and it is users' responsibility to > > update > > > > > their > > > > > > > > > custom > > > > > > > > > >> > > grouper > > > > > > > > > >> > > > > implementation. > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to > > > > > coordinator > > > > > > > > > stream: > > > > > > > > > >> > > Today, > > > > > > > > > >> > > > >> the JobModel encapsulates the data model in > samza > > > > which > > > > > > > also > > > > > > > > > >> > includes > > > > > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows the > > > > > > > > task-to-sspList > > > > > > > > > >> > > mapping. > > > > > > > > > >> > > > >> What is the reason for using a separate > > coordinator > > > > > > stream > > > > > > > > > >> message > > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel > > > > itself > > > > > is > > > > > > > not > > > > > > > > > >> > > persisted > > > > > > > > > >> > > > in > > > > > > > > > >> > > > >> the coordinator stream today? The reason > > locality > > > > > exists > > > > > > > > > >> outside of > > > > > > > > > >> > > the > > > > > > > > > >> > > > >> jobmodel is because *locality* information is > > > written > > > > > by > > > > > > > each > > > > > > > > > >> > > container, > > > > > > > > > >> > > > >> where as it is consumed only by the leader > > > > > > > jobcoordinator/AM. > > > > > > > > > In > > > > > > > > > >> > this > > > > > > > > > >> > > > >> case, > > > > > > > > > >> > > > >> the writer of the mapping information and the > > > reader > > > > is > > > > > > > still > > > > > > > > > the > > > > > > > > > >> > > leader > > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to understand the > > > > > > motivation > > > > > > > > for > > > > > > > > > >> this > > > > > > > > > >> > > > >> choice. > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Yes, the reason for using a separate coordinate > > > stream > > > > > > > message > > > > > > > > > is > > > > > > > > > >> > > because > > > > > > > > > >> > > > > the task-to-sspList mapping is not currently > > > persisted > > > > > in > > > > > > > the > > > > > > > > > >> > > coordinator > > > > > > > > > >> > > > > stream. We wouldn't need to create this new > stream > > > > > message > > > > > > > if > > > > > > > > > >> > JobModel > > > > > > > > > >> > > is > > > > > > > > > >> > > > > persisted. We need to persist the > task-to-sspList > > > > > mapping > > > > > > in > > > > > > > > the > > > > > > > > > >> > > > > coordinator stream so that the job can derive > the > > > > > original > > > > > > > > > number > > > > > > > > > >> of > > > > > > > > > >> > > > > partitions of each input stream regardless of > how > > > many > > > > > > times > > > > > > > > the > > > > > > > > > >> > > > partition > > > > > > > > > >> > > > > has expanded. Does this make sense? > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > I am not sure how this is related to the > locality > > > > > though. > > > > > > > Can > > > > > > > > > you > > > > > > > > > >> > > clarify > > > > > > > > > >> > > > > your question if I haven't answered your > question? > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > Thanks! > > > > > > > > > >> > > > > Dong > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> Cheers! > > > > > > > > > >> > > > >> Navina > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin < > > > > > > > > lindon...@gmail.com > > > > > > > > > > > > > > > > > > > >> > > wrote: > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > >> > Hi all, > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > We created SEP-5: Enable partition expansion > of > > > > input > > > > > > > > > streams. > > > > > > > > > >> > > Please > > > > > > > > > >> > > > >> find > > > > > > > > > >> > > > >> > the SEP wiki in the link > > > > > > > > > >> > > > >> > https://cwiki.apache.org/ > > > > > confluence/display/SAMZA/SEP- > > > > > > > > > >> > > > >> > 5%3A+Enable+partition+ > > expansion+of+input+streams > > > > > > > > > >> > > > >> > . > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > You feedback is appreciated! > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > Thanks, > > > > > > > > > >> > > > >> > Dong > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >