BTW, I will update the SEP-5 wiki with our latest discussion after I have got the wiki edit access.
On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin <lindon...@gmail.com> wrote: > Thanks everyone for the comment! > > I am currently leaning towards the current approach. I think Kartik raised > a good point that the extra repartitoning stage will also incur additional > throughput on Kafka in addition to the potential storage cost. Any other > Samza developers also chime in and provide your opinions on this proposal? > > Since this discussion thread has been open for three weeks, I will > initiate voting thread on Monday if there is no major revision suggestion. > > Thanks, > Dong > > > On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam < > kparamasi...@linkedin.com.invalid> wrote: > >> Great discussion ! >> >> Here are some more thoughts >> >> The point that repartitioning is a more general purpose solution is surely >> spot on. For many source systems (Kinesis, Google Pub-Sub, any of the >> older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways >> functionally required to do even simple keyed aggregations. But in most >> of these systems, the concept of repartitioning either does not exist or >> exists in a way which is very unique (e.g. Kinesis). >> >> I think this feature is really only interesting for source systems like >> Kafka and EventHub. EventHub (last I checked) didn't support >> repartitioning. So this is probably not super-interesting (yet) for >> EventHub. >> >> So Kafka is clearly the main use case here. >> >> For Kafka, I think it is pretty rare for people to customize the hashing >> algorithm for sending messages. I would argue that less than 5% of the >> population (i am being generous ;)) would do that. The current proposal >> works with the default hashing scheme for Kafka. So organizations will >> typically never have to coordinate. >> >> If the proposed alternative (always repartition) was side-effect free, >> then >> it would make sense to use an alternative design that would work for 100% >> of the population. Repartitioning all input would however not be a >> feasible solution (atleast at LinkedIn) as it would double the kafka >> workload. If many samza jobs read from kafka topics, then the increase >> would be a function of the number of samza jobs. >> >> For low throughput kafka topics, surely explicit repartitioning using >> fluent api is feasible. >> >> If the proposal was to make this new policy the default then that would >> clearly not make much sense. >> >> But it is an opt in policy. If it is not applicable, people don't have to >> use it. >> >> I do have some questions about the implementation. I will try to respond >> back after spending some more time on this. >> >> >> >> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes <jacob.m...@gmail.com> wrote: >> >> > Thanks, Dong. >> > >> > The summary looks accurate. >> > >> > I'll let the others chime in, as I believe my perspective has been >> > adequately captured in this thread. >> > >> > -Jake >> > >> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com> wrote: >> > >> > > Hey Jacob, >> > > >> > > Thank you for taking so much time to discuss with me! I appreciate the >> > > discussion and the insight. I will summarize our discussion below. >> > > >> > > 1) Whether it is reasonable to store partition-to-task mapping. >> > > >> > > We agree that this partition-to-task mapping will be reasonable if we >> > allow >> > > user to specify either the new-partition-to-old-partition mapping or >> > > key-to-partition mapping in the future. SEP-5 doesn't currently >> provide a >> > > way for user to specify new-partition-to-old partition mapping >> because we >> > > don't have a good idea about that interface until we try to enable >> > > partition expansion for input system other than Kafka in the future. >> This >> > > is currently specified as the third future work in SEP-5. >> > > >> > > And if we decide to implement SEP-5, I will include a warning message >> > > regarding the use of partition-to-task, i.e. "this does not specify >> the >> > > key-to-task mapping". We agree that this could address the concern >> here. >> > > >> > > 2) Whether we should follow the approach in SEP-5 or use an extra >> > > re-partitioning stage in the stateful Samza job to enable partition >> > > expansion. >> > > >> > > Here are the pros and cons of the extra re-partitioning stage in >> > comparison >> > > to SEP-5. >> > > >> > > Pros: >> > > - It doesn't require owner of the Samza job to know the partitioning >> > > algorithm of used for the input stream. If the owner of the Samza job >> is >> > in >> > > a different organization than the producer of the input stream, this >> > > solution frees different organizations from having to coordinate with >> > each >> > > other. >> > > - It doesn't require owner of the Samza job to specify the >> partitioning >> > > algorithm of used for the input stream. Thus less config. >> > > >> > > Cons: >> > > - User has to make code change on their side to use the new fluent >> API. >> > > - The extra partitioning stage would potentially increases latency. >> > > - The extra partitioning stage would incur additional cost due to the >> > extra >> > > internal topic. The cost is probably not that much with the new trim() >> > API >> > > in Kafka if Samza uses Kafka to store the internal topic. But the cost >> > may >> > > be doubled if Samza uses another input system that doesn't provide >> trim() >> > > API to delete data on demand. >> > > >> > > My recommendation is to adopt a hybrid solution, i.e. we still >> implement >> > > the current proposal in SEP-5 so that we enable partition expansion >> > without >> > > incurring extra latency/cost and without requiring users to change >> their >> > > code. And we can recommend user to use the extra partitioning stage if >> > the >> > > coordination among different organization is indeed a concern. >> > > >> > > Can other developers also provide feedback regarding your preference >> > > between the two? >> > > >> > > Thanks, >> > > Dong >> > > >> > > >> > > >> > > >> > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <jacob.m...@gmail.com> >> > wrote: >> > > >> > > > Hey Dong, >> > > > >> > > > I appreciate your thoughtful responses. Let's do one more round :-) >> > > > >> > > > >> > > > > Here are my current concern with the three alternatives you >> described >> > > > > earlier: >> > > > > - The first alternative requires support from input system which >> is >> > > > > currently not available. It will limit the usage of partition >> > expansion >> > > > to >> > > > > only systems that support such interface. And it is not guaranteed >> > that >> > > > we >> > > > > can persuade the developer of the input system to add this >> interface. >> > > > This >> > > > > is not desirable for Samza in the long term. >> > > > >> > > > Agreed. It is very wishful thinking that each supported system would >> > > > provide such a contract. >> > > > >> > > > >> > > > > >> > > > > - I can not comment on the second alternative because I don't >> > > understand >> > > > > how it reshuffles all existing changelog data. We can discuss >> more if >> > > > there >> > > > > is more specific detail. My gut feel is that this will be complex >> and >> > > > > carries performance overhead. >> > > > >> > > > After giving this more thought, I agree. There is no clear way to >> > > migrate a >> > > > changelog without knowing the original key->partition mapping. Which >> > > leads >> > > > us to alternative 3... >> > > > >> > > > >> > > > > >> > > > > - The third alternative requires performance overhead. Given that >> > user >> > > > can >> > > > > already use this solution to enable partition expansion, maybe >> Samza >> > > > > developers can provide more input as to why we are not doing it by >> > > > default. >> > > > > My gut feel is that it carries considerable performance overhead >> and >> > > > > increases the cost-to-serve Samze job (e.g. disk usage), which may >> > make >> > > > it >> > > > > undesirable in the long term. >> > > > >> > > > I think the only performance overhead would be the mandatory >> > > repartitioning >> > > > stage for stateful jobs. But a repartitioner is usually much faster >> > than >> > > > the downstream stateful job, so it only seems a cost to serve issue. >> > > > >> > > > As for why we aren't already doing this, I would posit that before >> the >> > > > introduction of the high level API, which trivializes >> repartitioning, >> > it >> > > > was unreasonable to expect each job owner to do the mandatory >> > > > repartitioning. With the high level API, I would argue this is much >> > more >> > > > doable. >> > > > >> > > > >> > > > I am not sure it is true that "any future feature that utilizes this >> > > > > mapping without accounting for the assumptions of this SEP is >> likely >> > to >> > > > > malfunction". Suppose we allow user to specify >> new-to-old-partition >> > > > > mapping, then we can use the partition-to-task mapping correctly >> > > without >> > > > > replying on the assumption in this SEP, right? >> > > > >> > > > Right, but my point was that the partition->task mapping is not >> > > sufficient >> > > > by itself. So adding it by itself is potentially misleading. >> > > > >> > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > > >> > > > > Thanks for the reply Jacob. Please see my comment inline. >> > > > > >> > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com >> > >> > > > wrote: >> > > > > >> > > > > > > >> > > > > > > - For users that need partition expansion of the input streams >> > for >> > > > > > stateful >> > > > > > > job, they have a really big headache in the sense that Samza >> does >> > > not >> > > > > > allow >> > > > > > > partition expansion for stateful job. SEP-5 addresses this >> > headache >> > > > for >> > > > > > > them. >> > > > > > > You are right that SEP-5 requires user to understand and >> enforce >> > > > > > > limitations across organizations. But it is still much better >> > than >> > > > not >> > > > > > > allowing user to expansion partition for stateful jobs at all, >> > > right? >> > > > > > Did I >> > > > > > > miss something here? >> > > > > > >> > > > > > I guess this one is a matter of perspective. >> > > > > > >> > > > > > One argument is that if the system supports one case, it's >> better >> > > than >> > > > > none >> > > > > > because there is one less scenario in which the system does the >> > wrong >> > > > > > thing. >> > > > > > >> > > > > > The counter argument is for uniform and consistent behavior, >> which >> > is >> > > > > easy >> > > > > > for users to understand and properly leverage. >> > > > > > >> > > > > > Specifically, I'd argue that the current rule is very simple: >> "you >> > > > cannot >> > > > > > repartition inputs on a stateful job, so you must over-partition >> > the >> > > > > > initial implementation". To me, while that rule is not ideal, >> its >> > > > > > simplicity is better that introducing a new solution that has a >> > bunch >> > > > of >> > > > > > caveats, any one of which could be missed. If any one of the >> > > > assumptions >> > > > > in >> > > > > > this SEP design are violated, the job would behave incorrectly. >> > That >> > > > > puts a >> > > > > > lot more burden on the users than the simpler rule. >> > > > > > >> > > > > >> > > > > I agree that we have different perspective here. It is true that >> user >> > > > would >> > > > > mess up their job if they used this feature in a wrong way, i.e. >> > > violate >> > > > > the assumption made in SEP-5. On the other hand, I think there is >> > > always >> > > > a >> > > > > way for user to mess up their job if they configure the Samza job >> > > > > incorrectly. I also think the assumption made in this SEP is not >> > > > > particularly harder to understand than other existing configs in >> > Samza. >> > > > > >> > > > > The answer to this can be subjective. I would love to hear >> > perspective >> > > > from >> > > > > other developers on this issue. >> > > > > >> > > > > >> > > > > > >> > > > > > That's why I mentioned a few alternatives that, while more >> complex >> > to >> > > > > > implement, would provide a more consistent behavior with simple >> > rules >> > > > for >> > > > > > the users. >> > > > > > >> > > > > >> > > > > I am open to discuss alternative solutions that can address the >> the >> > > > problem >> > > > > in a better manner. I am not opposed to complexity as long as it >> > gives >> > > us >> > > > > good long term benefits. >> > > > > >> > > > > Here are my current concern with the three alternatives you >> described >> > > > > earlier: >> > > > > >> > > > > - The first alternative requires support from input system which >> is >> > > > > currently not available. It will limit the usage of partition >> > expansion >> > > > to >> > > > > only systems that support such interface. And it is not guaranteed >> > that >> > > > we >> > > > > can persuade the developer of the input system to add this >> interface. >> > > > This >> > > > > is not desirable for Samza in the long term. >> > > > > >> > > > > - I can not comment on the second alternative because I don't >> > > understand >> > > > > how it reshuffles all existing changelog data. We can discuss >> more if >> > > > there >> > > > > is more specific detail. My gut feel is that this will be complex >> and >> > > > > carries performance overhead. >> > > > > >> > > > > - The third alternative requires performance overhead. Given that >> > user >> > > > can >> > > > > already use this solution to enable partition expansion, maybe >> Samza >> > > > > developers can provide more input as to why we are not doing it by >> > > > default. >> > > > > My gut feel is that it carries considerable performance overhead >> and >> > > > > increases the cost-to-serve Samze job (e.g. disk usage), which may >> > make >> > > > it >> > > > > undesirable in the long term. >> > > > > >> > > > > >> > > > > >> > > > > > >> > > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi >> > > > > > > thFixedTaskNum >> > > > > > > as well. If there is more grouper classes needed in the >> future, >> > we >> > > > can >> > > > > > > solve this problem cleanly without new config. Given the >> > > > > > > previousGrouperClass and newGrouperClass, >> KafkaCheckpointLogKey >> > > will >> > > > > > throw >> > > > > > > exception if and only if newGrouperClass is an instance of >> > > > > > > previousGrouperClass. >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend >> > > > > > > GroupBySystemStreamPartition >> > > > > > > and GroupByPartitionWithFixedTaskNum should extend >> > > GroupByPartition. >> > > > > > Does >> > > > > > > this address your concern? >> > > > > > >> > > > > > Sounds workable, thanks. >> > > > > > >> > > > > > > >> > > > > > > Can >> > > > > > > you be more specific why Partition-to-task mapping is not >> > > meaningful >> > > > > > > without >> > > > > > > some definition of the key-to-partition assignments and why >> it is >> > > > > > > incomplete and misleading? >> > > > > > >> > > > > > A partition is (in my naive interpretation) an independent >> queue >> > for >> > > > > > messages of a particular key set. It is not the *identity* of >> the >> > > > > partition >> > > > > > that determine the contents of the associated task's local >> state. >> > > > Rather >> > > > > it >> > > > > > is the *contents* of the partition that affect the task's >> state. A >> > > > > > partiton-to-task mapping only captures an identity relationship: >> > > > > > partition1->task1. Without the assumptions of this SEP, this is >> > > > > > insufficient to determine the assignment of keys to tasks, >> which is >> > > > what >> > > > > > really matters. Therefore, any future feature that utilizes this >> > > > mapping >> > > > > > without accounting for the assumptions of this SEP is likely to >> > > > > > malfunction. >> > > > > > >> > > > > > >> > > > > I am not sure it is true that "any future feature that utilizes >> this >> > > > > mapping without accounting for the assumptions of this SEP is >> likely >> > to >> > > > > malfunction". Suppose we allow user to specify >> new-to-old-partition >> > > > > mapping, then we can use the partition-to-task mapping correctly >> > > without >> > > > > replying on the assumption in this SEP, right? >> > > > > >> > > > > >> > > > > > >> > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > > > >> > > > > > > Hey Jacob, >> > > > > > > >> > > > > > > Thanks for the explanation. It seems that your biggest >> concern is >> > > > with >> > > > > > the >> > > > > > > generality of the proposal. Let me try to address this and >> other >> > > > > comments >> > > > > > > below. >> > > > > > > >> > > > > > > 1) ... it will cause headaches for Samza users ... >> > > > > > > >> > > > > > > I am not sure I understand why this proposal causes headache >> for >> > > > Samza >> > > > > > > users. Here is the impact of the SEP-5 on users: >> > > > > > > >> > > > > > > - For users that do not need partition expansion of the input >> > > stream, >> > > > > > they >> > > > > > > can use Samza without change change in code/binary/config. >> Thus >> > > there >> > > > > is >> > > > > > no >> > > > > > > headache for them. >> > > > > > > >> > > > > > > - For users that need partition expansion of the input streams >> > for >> > > > > > > stateless job, they currently need to manually reboot their >> Samza >> > > job >> > > > > in >> > > > > > > order to let Samza consume the new partitions created for the >> > > stream. >> > > > > > SEP-5 >> > > > > > > actually reduced their headache by allowing Samza to >> > automatically >> > > > > detect >> > > > > > > and consume new partitions. >> > > > > > > >> > > > > > > - For users that need partition expansion of the input streams >> > for >> > > > > > stateful >> > > > > > > job, they have a really big headache in the sense that Samza >> does >> > > not >> > > > > > allow >> > > > > > > partition expansion for stateful job. SEP-5 addresses this >> > headache >> > > > for >> > > > > > > them. >> > > > > > > >> > > > > > > You are right that SEP-5 requires user to understand and >> enforce >> > > > > > > limitations across organizations. But it is still much better >> > than >> > > > not >> > > > > > > allowing user to expansion partition for stateful jobs at all, >> > > right? >> > > > > > Did I >> > > > > > > miss something here? >> > > > > > > >> > > > > > > 2) ... Separate orgs are often difficult to coordinate and a >> > system >> > > > > which >> > > > > > > depends on such significant process/coordination is too >> fragile >> > for >> > > > my >> > > > > > > taste .. >> > > > > > > >> > > > > > > This is true. Ideally we want a system that is fully >> > self-serving. >> > > I >> > > > > > think >> > > > > > > this is a long term goal for Samza. Still, for the reasons >> > > described >> > > > > > above, >> > > > > > > I think something is better than nothing. I am open to >> > alternative >> > > > > design >> > > > > > > that can support partition expansion for stateful jobs without >> > > > > requiring >> > > > > > > coordination. >> > > > > > > >> > > > > > > 3) There is currently no supported way of sharing state among >> the >> > > > tasks >> > > > > > of >> > > > > > > a container. Each task has its own isolated store and that >> > logical >> > > > > > > isolation is the primary thing that enables Samza jobs to >> scale >> > > with >> > > > a >> > > > > > > simple container count change. My feeling is that we should >> not >> > > > change >> > > > > > > this without >> > > > > > > good reason. >> > > > > > > >> > > > > > > I see your point. I will remove this sentence from the >> motivation >> > > > > > section. >> > > > > > > This won't have any impact on the design of the SEP-5. Does >> this >> > > > > address >> > > > > > > the problem? >> > > > > > > >> > > > > > > 4) With the current proposal, we'd also need a similar check >> for >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if >> any >> > > > other >> > > > > > > groupers >> > > > > > > were later added with both these modes, we'd probably need to >> add >> > > > those >> > > > > > > too. It might be easier and cleaner to add a config to ignore >> > that >> > > > > check >> > > > > > > temporarily. Down side is that it further complicates the >> Samza >> > > > config, >> > > > > > > which is already huge. Thoughts? >> > > > > > > >> > > > > > > Yes, we need a similar check for >> GroupBySystemStreamPartitionWi >> > > > > > > thFixedTaskNum >> > > > > > > as well. If there is more grouper classes needed in the >> future, >> > we >> > > > can >> > > > > > > solve this problem cleanly without new config. Given the >> > > > > > > previousGrouperClass and newGrouperClass, >> KafkaCheckpointLogKey >> > > will >> > > > > > throw >> > > > > > > exception if and only if newGrouperClass is an instance of >> > > > > > > previousGrouperClass. >> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend >> > > > > > > GroupBySystemStreamPartition >> > > > > > > and GroupByPartitionWithFixedTaskNum should extend >> > > GroupByPartition. >> > > > > > Does >> > > > > > > this address your concern? >> > > > > > > >> > > > > > > 5) The task-to-container and container-to-host mappings are >> both >> > > > > > meaningful >> > > > > > > in context of the JobModel. Partition-to-task mapping is not >> > > > meaningful >> > > > > > > without >> > > > > > > some definition of the key-to-partition assignments. It's >> > > incomplete >> > > > > > > information and therefore misleading. I think it only makes >> sense >> > > to >> > > > > use >> > > > > > > this mapping if we adopt a solution wherein Samza also knows >> the >> > > > > > partition >> > > > > > > key assignment. >> > > > > > > >> > > > > > > Partition-to-task is currently explicitly passed from job >> > > coordinator >> > > > > to >> > > > > > > each task as part of the job model to tell tasks which >> partitions >> > > to >> > > > > > > consume from. I think we can store some definition of the >> > > > > > key-to-partition >> > > > > > > assignments if Samza decides to get and use this information >> in >> > the >> > > > > > > future. Can >> > > > > > > you be more specific why Partition-to-task mapping is not >> > > meaningful >> > > > > > > without >> > > > > > > some definition of the key-to-partition assignments and why >> it is >> > > > > > > incomplete and misleading? >> > > > > > > >> > > > > > > >> > > > > > > Thanks, >> > > > > > > Dong >> > > > > > > >> > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes < >> > jacob.m...@gmail.com> >> > > > > > wrote: >> > > > > > > >> > > > > > > > Hey Dong, >> > > > > > > > >> > > > > > > > I'm opposed (or a +0, at best) to this limited, >> Kafka-specific >> > > > > > solution. >> > > > > > > I >> > > > > > > > understand that the proposal is relatively simple to >> implement, >> > > > but I >> > > > > > > think >> > > > > > > > it will cause headaches for Samza users. They will not only >> > have >> > > to >> > > > > > > > understand all the limitations (increase only, double >> > partitions >> > > > > only, >> > > > > > > > partition using hash+modulo, etc) of this approach, but >> > enforcing >> > > > > these >> > > > > > > > limitations can be a major problem, especially when the >> Samza >> > > jobs >> > > > > and >> > > > > > > > message brokers are managed by separate orgs in a company. >> > > Separate >> > > > > > orgs >> > > > > > > > are often difficult to coordinate and a system which >> depends on >> > > > such >> > > > > > > > significant process/coordination is too fragile for my >> taste. >> > > > > > > > >> > > > > > > > That said, I realize that my opinion is just one of many in >> the >> > > > > broader >> > > > > > > > community which may feel differently, so let me respond to >> some >> > > of >> > > > > the >> > > > > > > > other items in the discussion so we can clear them up: >> > > > > > > > >> > > > > > > > The task-to-container assignment matters because if the >> > > correlated >> > > > > > tasks >> > > > > > > > > (i.e. tasks that consume messages with the same key) >> needs to >> > > be >> > > > in >> > > > > > the >> > > > > > > > > same container so that they can share the same key/value >> > local >> > > > > store >> > > > > > on >> > > > > > > > the >> > > > > > > > > same physical machine. >> > > > > > > > >> > > > > > > > There is currently no supported way of sharing state among >> the >> > > > tasks >> > > > > > of a >> > > > > > > > container. Each task has its own isolated store and that >> > logical >> > > > > > > isolation >> > > > > > > > is the primary thing that enables Samza jobs to scale with a >> > > simple >> > > > > > > > container count change. My feeling is that we should not >> change >> > > > this >> > > > > > > > without good reason. >> > > > > > > > >> > > > > > > > I think we can hardcode new logic in >> > KafkaCheckpointLogKey.scala >> > > > such >> > > > > > > that >> > > > > > > > > exception will not be thrown if new grouper is >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is >> > > > > > GroupByPartition. >> > > > > > > > Does >> > > > > > > > > this look reasonable? >> > > > > > > > >> > > > > > > > With the current proposal, we'd also need a similar check >> for >> > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And >> if >> > any >> > > > > other >> > > > > > > > groupers were later added with both these modes, we'd >> probably >> > > need >> > > > > to >> > > > > > > add >> > > > > > > > those too. It might be easier and cleaner to add a config to >> > > ignore >> > > > > > that >> > > > > > > > check temporarily. Down side is that it further complicates >> the >> > > > Samza >> > > > > > > > config, which is already huge. Thoughts? >> > > > > > > > >> > > > > > > > I think storing the previous task-to-partition mapping is >> more >> > > > > general >> > > > > > > than >> > > > > > > > > storing the partition count of all topics for the >> following >> > > > > reasons: >> > > > > > > > > - Samza already stores the task-to-container mapping and >> > > > > > > > container-to-host >> > > > > > > > > mapping in the coordinator stream. It seems consistent to >> > also >> > > > > store >> > > > > > > the >> > > > > > > > > partition-to-task mapping. And this information may be >> useful >> > > for >> > > > > > other >> > > > > > > > > use-case such as debugging. >> > > > > > > > > - By having the new interface take the previous >> > > task-to-partition >> > > > > > > > > assignment instead of a topic-to-partition-count mapping >> as >> > new >> > > > > > > > parameter, >> > > > > > > > > we can potentially have grouper implementation to support >> > other >> > > > > types >> > > > > > > of >> > > > > > > > > input systems. >> > > > > > > > > - It is sightly simpler to store the task-to-partition >> > > assignment >> > > > > > > because >> > > > > > > > > we don't need to know whether this is the first time a >> job is >> > > > > started >> > > > > > > or >> > > > > > > > > not. On the other hand, you can write >> > topic-to-partition-count >> > > > > > mapping >> > > > > > > to >> > > > > > > > > the coordinator stream only if this is the first time the >> job >> > > is >> > > > > run >> > > > > > > > >> > > > > > > > The task-to-container and container-to-host mappings are >> both >> > > > > > meaningful >> > > > > > > in >> > > > > > > > context of the JobModel. Partition-to-task mapping is not >> > > > meaningful >> > > > > > > > without some definition of the key-to-partition assignments. >> > It's >> > > > > > > > incomplete information and therefore misleading. I think it >> > only >> > > > > makes >> > > > > > > > sense to use this mapping if we adopt a solution wherein >> Samza >> > > also >> > > > > > knows >> > > > > > > > the partition key assignment. >> > > > > > > > >> > > > > > > > -Jake >> > > > > > > > >> > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin < >> lindon...@gmail.com >> > > >> > > > > wrote: >> > > > > > > > >> > > > > > > > > Hey Jacob, >> > > > > > > > > >> > > > > > > > > Thanks for taking time to review the SEP. >> > > > > > > > > >> > > > > > > > > I agree with you and Navina that the current SEP doesn't >> > > provide >> > > > > > > support >> > > > > > > > to >> > > > > > > > > arbitrary input systems and it doesn't support partition >> > > shrink. >> > > > I >> > > > > > > think >> > > > > > > > > the scope of this SEP is to support partition expansion >> for >> > > Kafka >> > > > > > (the >> > > > > > > > most >> > > > > > > > > widely used input system of Samza) and keep the door open >> for >> > > > > > partition >> > > > > > > > > support of various input systems. The current design can >> > > support >> > > > > any >> > > > > > > > system >> > > > > > > > > that meets the two operational requirement specified in >> the >> > > doc. >> > > > > > > > > >> > > > > > > > > While it is possible to support more types of input >> systems, >> > it >> > > > > will >> > > > > > > > likely >> > > > > > > > > add more complexity to the design. For example, the first >> > > > > alternative >> > > > > > > > > solution from you requires broker-side support to >> negotiate >> > > hash >> > > > > > > > algorithm. >> > > > > > > > > The second alternative solution requires changelog >> partition >> > > > > > reshuffle >> > > > > > > > > which carries its own design complexity and performance >> > > overhead. >> > > > > > There >> > > > > > > > is >> > > > > > > > > tradeoff between the generality and the complexity among >> > these >> > > > > > > choices. I >> > > > > > > > > like the current design because it is simple and >> addresses a >> > > big >> > > > > > usage >> > > > > > > > > scenario for us. We can add more complexity to generalize >> the >> > > > > design >> > > > > > if >> > > > > > > > it >> > > > > > > > > enables important use-case. Does this sound reasonable? >> > > > > > > > > >> > > > > > > > > Note that the "Rejected Alternative" section also mentions >> > the >> > > > > > > > possibility >> > > > > > > > > of supporting a wider range of input systems by allowing >> user >> > > to >> > > > > > > specify >> > > > > > > > > the new-partition to old-partition mapping. We are not >> doing >> > it >> > > > > > because >> > > > > > > > 1) >> > > > > > > > > we may have better understanding of the design after we >> have >> > a >> > > > > > specific >> > > > > > > > > second input system to support 2) the current design can >> be >> > > > > extended >> > > > > > to >> > > > > > > > > support general input systems. I think similar argument >> can >> > be >> > > > > > applied >> > > > > > > > > explain why we don't have to support general input systems >> > > using >> > > > > the >> > > > > > > > > potentially-good alternatives you mentioned. >> > > > > > > > > >> > > > > > > > > I hope SEP-5 can be an important first-step towards >> > supporting >> > > > > > > partition >> > > > > > > > > expansion of any input system. >> > > > > > > > > >> > > > > > > > > To answer your questions about the current proposal: >> > > > > > > > > >> > > > > > > > > >1. "An alternative solution is to allow task number to >> > > increase >> > > > > > after >> > > > > > > > > >partition expansion and uses a proper task-to-container >> > > > assignment >> > > > > > to >> > > > > > > > make >> > > > > > > > > >sure the Samza output is correct." What does the >> container >> > > have >> > > > to >> > > > > > do >> > > > > > > > with >> > > > > > > > > >stateful processing or output in general? >> > > > > > > > > >> > > > > > > > > The task-to-container assignment matters because if the >> > > > correlated >> > > > > > > tasks >> > > > > > > > > (i.e. tasks that consume messages with the same key) >> needs to >> > > be >> > > > in >> > > > > > the >> > > > > > > > > same container so that they can share the same key/value >> > local >> > > > > store >> > > > > > on >> > > > > > > > the >> > > > > > > > > same physical machine. >> > > > > > > > > >> > > > > > > > > >2. When you use "Join" as an example, you basically mean >> > > > multiple >> > > > > > > > > >co-partitioned streams, right? This is opposed to >> multiple, >> > > > > > > > > >independently-partitioned streams or a single stream. >> Would >> > be >> > > > > nice >> > > > > > to >> > > > > > > > > >formulate the proposal in these more general terms. >> > > > > > > > > >> > > > > > > > > I thought "join" is a commonly used to refer to the join >> > > > opeartion >> > > > > > with >> > > > > > > > > co-partitioned stream but I may be wrong. I have updated >> the >> > > wiki >> > > > > to >> > > > > > > > > explicitly mention "co-partitioned stream". Does this look >> > > better >> > > > > > now? >> > > > > > > > > >> > > > > > > > > >3. When switching SSP groupers, how will the users avoid >> the >> > > > > > > > > >org.apache.samza.checkpoint.kafka. >> > > > DifferingSystemStreamPartition >> > > > > > > > > GrouperFactoryValues >> > > > > > > > > >exception? >> > > > > > > > > >> > > > > > > > > I think we can hardcode new logic in >> > > KafkaCheckpointLogKey.scala >> > > > > such >> > > > > > > > that >> > > > > > > > > exception will not be thrown if new grouper is >> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is >> > > > > > GroupByPartition. >> > > > > > > > Does >> > > > > > > > > this look reasonable? >> > > > > > > > > >> > > > > > > > > >4. Partition to task assignment is meaningless without >> key >> > to >> > > > > > > partition >> > > > > > > > > >mapping. The real semantics are captured in the external >> > > > > requirement >> > > > > > > for >> > > > > > > > > >partitioning via hash+modulo. But in that case, iiuc, >> only >> > the >> > > > > > > partition >> > > > > > > > > >count matters. So why not just store the original >> partition >> > > > count >> > > > > > > rather >> > > > > > > > > >than the whole mapping? >> > > > > > > > > >> > > > > > > > > I think storing the previous task-to-partition mapping is >> > more >> > > > > > general >> > > > > > > > than >> > > > > > > > > storing the partition count of all topics for the >> following >> > > > > reasons: >> > > > > > > > > >> > > > > > > > > - Samza already stores the task-to-container mapping and >> > > > > > > > container-to-host >> > > > > > > > > mapping in the coordinator stream. It seems consistent to >> > also >> > > > > store >> > > > > > > the >> > > > > > > > > partition-to-task mapping. And this information may be >> useful >> > > for >> > > > > > other >> > > > > > > > > use-case such as debugging. >> > > > > > > > > >> > > > > > > > > - By having the new interface take the previous >> > > task-to-partition >> > > > > > > > > assignment instead of a topic-to-partition-count mapping >> as >> > new >> > > > > > > > parameter, >> > > > > > > > > we can potentially have grouper implementation to support >> > other >> > > > > types >> > > > > > > of >> > > > > > > > > input systems. >> > > > > > > > > >> > > > > > > > > - It is sightly simpler to store the task-to-partition >> > > assignment >> > > > > > > because >> > > > > > > > > we don't need to know whether this is the first time a >> job is >> > > > > started >> > > > > > > or >> > > > > > > > > not. On the other hand, you can write >> > topic-to-partition-count >> > > > > > mapping >> > > > > > > to >> > > > > > > > > the coordinator stream only if this is the first time the >> job >> > > is >> > > > > run >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > Dong >> > > > > > > > > >> > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes < >> > > > jacob.m...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hey Dong, >> > > > > > > > > > >> > > > > > > > > > Thanks for the SEP. Supporting partition changes is >> > > critically >> > > > > > > > important >> > > > > > > > > > for stateful Samza jobs, so it's great to see some >> ideas on >> > > > that >> > > > > > > front! >> > > > > > > > > > >> > > > > > > > > > Sorry for the late feedback, but I have a few thoughts >> to >> > > > > > contribute. >> > > > > > > > > > >> > > > > > > > > > Big +1 on Navina's comment: >> > > > > > > > > > >> > > > > > > > > > > My biggest gripe with this SEP is that it seems like a >> > > > > > tailor-made >> > > > > > > > > > > solution >> > > > > > > > > > > that relies on the semantics of the Kafka system and >> yet, >> > > we >> > > > > are >> > > > > > > > trying >> > > > > > > > > > to >> > > > > > > > > > > masquerade that as operational requirements for other >> > > systems >> > > > > > > > > interacting >> > > > > > > > > > > with Samza. (Not to say that this is the first time >> such >> > a >> > > > > choice >> > > > > > > is >> > > > > > > > > > being >> > > > > > > > > > > made in the Samza design). I am not seeing how this >> can a >> > > > > > "general" >> > > > > > > > > > > solution for all input systems. That's my two cents. I >> > > would >> > > > > like >> > > > > > > to >> > > > > > > > > hear >> > > > > > > > > > > alternative points of view for this from other devs. >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > > Two examples of this: >> > > > > > > > > > 1. This is mostly a hypothetical, but some message >> brokers >> > > may >> > > > > use >> > > > > > > key >> > > > > > > > > > range assignment rather than hash+modulo. >> > > > > > > > > > 2. Kafka can't reduce the number of partitions, but it >> can >> > > > happen >> > > > > > on >> > > > > > > > > other >> > > > > > > > > > systems. For example, it may be cheaper to reduce the >> > number >> > > of >> > > > > > > > > partitions >> > > > > > > > > > on a hosted service where the cost model depends on the >> > > number >> > > > of >> > > > > > > > > > partitions/shards. >> > > > > > > > > > >> > > > > > > > > > It seems to me that a solution which doesn't depend on >> > > > partition >> > > > > > key >> > > > > > > > > > assignment in the message broker. Here are a few >> > alternatives >> > > > > that >> > > > > > > > > weren't >> > > > > > > > > > discussed and I think should be considered: >> > > > > > > > > > >> > > > > > > > > > Alternatives in order of increasing preference: >> > > > > > > > > > 1. Samza manages the partition hash (via some new >> contract >> > > with >> > > > > the >> > > > > > > > > > brokers) and guarantees correct routing of keys among >> the >> > new >> > > > > > > > partitions. >> > > > > > > > > > 2. Samza detects a task count change, creates a new >> > changelog >> > > > > with >> > > > > > > > > correct >> > > > > > > > > > partitions, and *somehow* reshuffles all existing >> changelog >> > > > data >> > > > > > into >> > > > > > > > the >> > > > > > > > > > new topic and then uses the new topic from then on. >> > (doesn't >> > > > work >> > > > > > > > without >> > > > > > > > > > changelog, but in that case durability isn't paramount, >> so >> > we >> > > > can >> > > > > > > just >> > > > > > > > > > wipe) >> > > > > > > > > > 3. Use RPC in between stages and samza fully manages key >> > > > > assignment >> > > > > > > > among >> > > > > > > > > > tasks. No on-disk topic data to clean up. Mandatory >> > > > > repartitioning >> > > > > > in >> > > > > > > > the >> > > > > > > > > > first stage to pre-scaled tasks in next stage. >> > > > > > > > > > 4. Combined 2-3 solution >> > > > > > > > > > >> > > > > > > > > > Finally, some questions about the current proposal: >> > > > > > > > > > 1. "An alternative solution is to allow task number to >> > > increase >> > > > > > after >> > > > > > > > > > partition expansion and uses a proper task-to-container >> > > > > assignment >> > > > > > to >> > > > > > > > > make >> > > > > > > > > > sure the Samza output is correct." What does the >> container >> > > have >> > > > > to >> > > > > > do >> > > > > > > > > with >> > > > > > > > > > stateful processing or output in general? >> > > > > > > > > > 2. When you use "Join" as an example, you basically mean >> > > > multiple >> > > > > > > > > > co-partitioned streams, right? This is opposed to >> multiple, >> > > > > > > > > > independently-partitioned streams or a single stream. >> Would >> > > be >> > > > > nice >> > > > > > > to >> > > > > > > > > > formulate the proposal in these more general terms. >> > > > > > > > > > 3. When switching SSP groupers, how will the users avoid >> > the >> > > > > > > > > > org.apache.samza.checkpoint.kafka. >> > DifferingSystemStreamParti >> > > > > > > > > > tionGrouperFactoryValues >> > > > > > > > > > exception? >> > > > > > > > > > 4. Partition to task assignment is meaningless without >> key >> > to >> > > > > > > partition >> > > > > > > > > > mapping. The real semantics are captured in the external >> > > > > > requirement >> > > > > > > > for >> > > > > > > > > > partitioning via hash+modulo. But in that case, iiuc, >> only >> > > the >> > > > > > > > partition >> > > > > > > > > > count matters. So why not just store the original >> partition >> > > > count >> > > > > > > > rather >> > > > > > > > > > than the whole mapping? >> > > > > > > > > > >> > > > > > > > > > -Jake >> > > > > > > > > > >> > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin < >> > > lindon...@gmail.com >> > > > > >> > > > > > > wrote: >> > > > > > > > > > >> > > > > > > > > > > Hey Yi, Navina, >> > > > > > > > > > > >> > > > > > > > > > > I have updated the SEP-5 document based on our >> > discussion. >> > > > The >> > > > > > > > > difference >> > > > > > > > > > > can be found here >> > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/ >> > > > > > > diffpagesbyversion.action >> > > > > > > > ? >> > > > > > > > > > > pageId=70255476&selectedPageVersions=14& >> > > selectedPageVersions >> > > > > > =15>. >> > > > > > > > > > > Here is the summary of changes: >> > > > > > > > > > > >> > > > > > > > > > > - Add new interface that extends the existing >> interface >> > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added grouper >> class >> > > > should >> > > > > > > > > implement >> > > > > > > > > > > this interface. >> > > > > > > > > > > - Explained in the Rejected Alternative Section why we >> > > don't >> > > > > add >> > > > > > > new >> > > > > > > > > > method >> > > > > > > > > > > in the existing interface >> > > > > > > > > > > - Explained in the Rejected Alternative Section why we >> > > don't >> > > > > > > > > config/class >> > > > > > > > > > > for user to specify new-partition to old-partition >> > mapping. >> > > > > > > > > > > >> > > > > > > > > > > Can you take another look at the proposal and let me >> know >> > > if >> > > > > > there >> > > > > > > is >> > > > > > > > > any >> > > > > > > > > > > concern? >> > > > > > > > > > > >> > > > > > > > > > > Cheers, >> > > > > > > > > > > Dong >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin < >> > > > lindon...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hey Yi, >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks much for the comment. I have updated the doc >> to >> > > > > address >> > > > > > > all >> > > > > > > > > your >> > > > > > > > > > > > comments except the one related to the interface. I >> am >> > > not >> > > > > > sure I >> > > > > > > > > > > > understand your suggestion of the new interface. >> Will >> > > > discuss >> > > > > > > > > tomorrow. >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks, >> > > > > > > > > > > > Dong >> > > > > > > > > > > > >> > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan < >> > > > nickpa...@gmail.com >> > > > > > >> > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > >> Hi, Don, >> > > > > > > > > > > >> >> > > > > > > > > > > >> Thanks for the detailed design doc for a >> long-waited >> > > > feature >> > > > > > in >> > > > > > > > > Samza! >> > > > > > > > > > > >> Really appreciate it! I did a quick pass and have >> the >> > > > > > following >> > > > > > > > > > > comments: >> > > > > > > > > > > >> >> > > > > > > > > > > >> - minor: "limit the maximum size of partition" ==> >> > > "limit >> > > > > the >> > > > > > > > > maximum >> > > > > > > > > > > size >> > > > > > > > > > > >> of each partition" >> > > > > > > > > > > >> - "However, Samza currently is not able to handle >> > > > partition >> > > > > > > > > expansion >> > > > > > > > > > of >> > > > > > > > > > > >> the input streams"==>better point out "for stateful >> > > jobs". >> > > > > For >> > > > > > > > > > stateless >> > > > > > > > > > > >> jobs, simply bouncing the job now can pick up the >> new >> > > > > > > partitions. >> > > > > > > > > > > >> - "it is possible (e.g. with Kafka) that messages >> > with a >> > > > > given >> > > > > > > key >> > > > > > > > > > > exists >> > > > > > > > > > > >> in both partition 1 an 3. Because GroupByPartition >> > will >> > > > > assign >> > > > > > > > > > > partition 1 >> > > > > > > > > > > >> and 3 to different tasks, messages with the same >> key >> > may >> > > > be >> > > > > > > > handled >> > > > > > > > > by >> > > > > > > > > > > >> different task/container/process and their state >> will >> > be >> > > > > > stored >> > > > > > > in >> > > > > > > > > > > >> different changelog partition." The problem >> statement >> > is >> > > > not >> > > > > > > super >> > > > > > > > > > clear >> > > > > > > > > > > >> here. The issues with stateful jobs is: after >> > > > > GroupByPartition >> > > > > > > > > assign >> > > > > > > > > > > >> partition 1 and 3 to different tasks, the new task >> > > > handling >> > > > > > > > > partition >> > > > > > > > > > 3 >> > > > > > > > > > > >> does not have the previous state to resume the >> work. >> > > e.g. >> > > > a >> > > > > > > > page-key >> > > > > > > > > > > based >> > > > > > > > > > > >> counter would start from 0 in the new task for a >> > > specific >> > > > > key, >> > > > > > > > > instead >> > > > > > > > > > > of >> > > > > > > > > > > >> resuming the previous count 50 held by task 1. >> > > > > > > > > > > >> - minor rewording: "the first solution in this doc" >> > ==> >> > > > "the >> > > > > > > > > solution >> > > > > > > > > > > >> proposed in this doc" >> > > > > > > > > > > >> - "Thus additional development work is needed in >> Kafka >> > > to >> > > > > meet >> > > > > > > > this >> > > > > > > > > > > >> requirement" It would be good to link to a KIP if >> and >> > > when >> > > > > it >> > > > > > > > exists >> > > > > > > > > > > >> - Instead of touching/deprecating the interface >> > > > > > > > > > > >> SystemStreamPartitionGrouper, I would recommend to >> > have >> > > a >> > > > > > > > different >> > > > > > > > > > > >> implementation class of the interface, which in the >> > > > > > constructor >> > > > > > > of >> > > > > > > > > the >> > > > > > > > > > > >> grouper, takes two parameters: a) the previous task >> > > number >> > > > > > read >> > > > > > > > from >> > > > > > > > > > the >> > > > > > > > > > > >> coordinator stream; b) the configured >> new-partition to >> > > > > > > > old-partition >> > > > > > > > > > > >> mapping policy. Then, the grouper's interface >> method >> > > stays >> > > > > the >> > > > > > > > same >> > > > > > > > > > and >> > > > > > > > > > > >> the >> > > > > > > > > > > >> behavior of the grouper is more configurable which >> is >> > > good >> > > > > to >> > > > > > > > > support >> > > > > > > > > > a >> > > > > > > > > > > >> broader set of use cases in addition to Kafka's >> > built-in >> > > > > > > partition >> > > > > > > > > > > >> expansion policies. >> > > > > > > > > > > >> - Minor renaming suggestion to the new grouper >> class >> > > > names: >> > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum >> > > > > > > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum >> > > > > > > > > > > >> >> > > > > > > > > > > >> Thanks! >> > > > > > > > > > > >> >> > > > > > > > > > > >> - Yi >> > > > > > > > > > > >> >> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin < >> > > > > > lindon...@gmail.com >> > > > > > > > >> > > > > > > > > > wrote: >> > > > > > > > > > > >> >> > > > > > > > > > > >> > Hey Navina, >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Thanks much for the comment. Please see my >> response >> > > > below. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Regarding your biggest gripe with the SEP, I >> > > personally >> > > > > > think >> > > > > > > > the >> > > > > > > > > > > >> > operational requirement proposed in the KIP are >> > pretty >> > > > > > general >> > > > > > > > and >> > > > > > > > > > > >> could be >> > > > > > > > > > > >> > easily enforced by other systems. The reason is >> that >> > > the >> > > > > > > module >> > > > > > > > > > > >> operation >> > > > > > > > > > > >> > is pretty standard and the default option when we >> > > choose >> > > > > > > > > partition. >> > > > > > > > > > > And >> > > > > > > > > > > >> > usually the underlying system allows user to >> select >> > > > > > arbitrary >> > > > > > > > > > > partition >> > > > > > > > > > > >> > number if it supports partition expansion. Do you >> > know >> > > > any >> > > > > > > > system >> > > > > > > > > > that >> > > > > > > > > > > >> does >> > > > > > > > > > > >> > not meet these two requirement? >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Regarding your comment of the Motivation >> section, I >> > > > > renamed >> > > > > > > the >> > > > > > > > > > first >> > > > > > > > > > > >> > section as "Problem and Goal" and specified that >> > "*The >> > > > > goal >> > > > > > of >> > > > > > > > > this >> > > > > > > > > > > >> > proposal is to enable partition expansion of the >> > input >> > > > > > > > > streams*.". I >> > > > > > > > > > > >> also >> > > > > > > > > > > >> > put a sentence at the end of the Motivation >> section >> > > that >> > > > > > "*The >> > > > > > > > > > feature >> > > > > > > > > > > >> of >> > > > > > > > > > > >> > task expansion is out of the scope of this >> proposal >> > > and >> > > > > will >> > > > > > > be >> > > > > > > > > > > >> addressed >> > > > > > > > > > > >> > in a future SEP*". The second paragraph in the >> > > > Motivation >> > > > > > > > section >> > > > > > > > > is >> > > > > > > > > > > >> mainly >> > > > > > > > > > > >> > used to explain the thinking process that we have >> > gone >> > > > > > > through, >> > > > > > > > > what >> > > > > > > > > > > >> other >> > > > > > > > > > > >> > alternative we have considered, and we plan to >> do in >> > > > Samza >> > > > > > in >> > > > > > > > the >> > > > > > > > > > nex >> > > > > > > > > > > >> step. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > To answer your question why increasing the >> partition >> > > > > number >> > > > > > > will >> > > > > > > > > > > >> increase >> > > > > > > > > > > >> > the throughput of the kafka consumer in the >> > container, >> > > > > Kafka >> > > > > > > > > > consumer >> > > > > > > > > > > >> can >> > > > > > > > > > > >> > potentially fetch more data in one FetchResponse >> > with >> > > > more >> > > > > > > > > > partitions >> > > > > > > > > > > in >> > > > > > > > > > > >> > the FetchRequest. This is because we limit the >> > maximum >> > > > > > amount >> > > > > > > of >> > > > > > > > > > data >> > > > > > > > > > > >> that >> > > > > > > > > > > >> > can be fetch for a given partition in the >> > > FetchResponse. >> > > > > > This >> > > > > > > by >> > > > > > > > > > > >> default is >> > > > > > > > > > > >> > set to 1 MB. And there is reason that we can not >> > > > > arbitrarily >> > > > > > > > bump >> > > > > > > > > up >> > > > > > > > > > > >> this >> > > > > > > > > > > >> > limit. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > To answer your question how partition expansion >> in >> > > Kafka >> > > > > > > impacts >> > > > > > > > > the >> > > > > > > > > > > >> > clients, Kafka consumer is able to automatically >> > > detect >> > > > > new >> > > > > > > > > > partition >> > > > > > > > > > > of >> > > > > > > > > > > >> > the topic and reassign all (both old and new) >> > > partitions >> > > > > > > across >> > > > > > > > > > > >> consumers >> > > > > > > > > > > >> > in the consumer group IF you tell consumer the >> topic >> > > to >> > > > be >> > > > > > > > > > subscribed. >> > > > > > > > > > > >> But >> > > > > > > > > > > >> > consumer in Samza's container uses another way of >> > > > > > > subscription. >> > > > > > > > > > > Instead >> > > > > > > > > > > >> of >> > > > > > > > > > > >> > subscribing to the topic, the consumer in Samza's >> > > > > container >> > > > > > > > > > subscribes >> > > > > > > > > > > >> to >> > > > > > > > > > > >> > the specific partitions of the topic. In this >> case, >> > if >> > > > new >> > > > > > > > > > partitions >> > > > > > > > > > > >> have >> > > > > > > > > > > >> > been added, Samza will need to explicitly >> subscribe >> > to >> > > > the >> > > > > > new >> > > > > > > > > > > >> partitions >> > > > > > > > > > > >> > of the topic. The "Handle partition expansion >> while >> > > > tasks >> > > > > > are >> > > > > > > > > > running" >> > > > > > > > > > > >> > section in the SEP addresses this issue in Samza >> -- >> > it >> > > > > > > > > recalculates >> > > > > > > > > > > the >> > > > > > > > > > > >> job >> > > > > > > > > > > >> > model and restart container so that consumer can >> > > > subscribe >> > > > > > to >> > > > > > > > the >> > > > > > > > > > new >> > > > > > > > > > > >> > partitions. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > I will ask other dev to take a look at the >> > proposal. I >> > > > > will >> > > > > > > > start >> > > > > > > > > > the >> > > > > > > > > > > >> > voting thread tomorrow if there is no further >> > concern >> > > > with >> > > > > > the >> > > > > > > > > SEP. >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > Thanks! >> > > > > > > > > > > >> > Dong >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh >> > > > (Apache) < >> > > > > > > > > > > >> > nav...@apache.org> >> > > > > > > > > > > >> > wrote: >> > > > > > > > > > > >> > >> > > > > > > > > > > >> > > Hey Dong, >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > I have updated the motivation section to >> > clarify >> > > > > this. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Thanks for updating the motivation. Couple of >> > notes >> > > > > here: >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > 1. >> > > > > > > > > > > >> > > > "The motivation of increasing partition >> number >> > of >> > > > > Kafka >> > > > > > > > topic >> > > > > > > > > > > >> includes >> > > > > > > > > > > >> > 1) >> > > > > > > > > > > >> > > limit the maximum size of a partition in order >> to >> > > > > improve >> > > > > > > > broker >> > > > > > > > > > > >> > > performance and 2) increase throughput of Kafka >> > > > consumer >> > > > > > in >> > > > > > > > the >> > > > > > > > > > > Samza >> > > > > > > > > > > >> > > container." >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > It's unclear to me how increasing the partition >> > > number >> > > > > > will >> > > > > > > > > > increase >> > > > > > > > > > > >> the >> > > > > > > > > > > >> > > throughput of the kafka consumer in the >> container? >> > > > > > > > > Theoretically, >> > > > > > > > > > > you >> > > > > > > > > > > >> > will >> > > > > > > > > > > >> > > still be consuming the same amount of data in >> the >> > > > > > container, >> > > > > > > > > > > >> irrespective >> > > > > > > > > > > >> > > of whether it is coming from one partition or >> more >> > > > than >> > > > > > one >> > > > > > > > > > expanded >> > > > > > > > > > > >> > > partitions. Can you please explain it for me >> here, >> > > > what >> > > > > > you >> > > > > > > > mean >> > > > > > > > > > by >> > > > > > > > > > > >> that? >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > 2. I believe the second paragraph under >> motivation >> > > is >> > > > > > simply >> > > > > > > > > > talking >> > > > > > > > > > > >> > about >> > > > > > > > > > > >> > > the scope of the current SEP. It will be >> easier to >> > > > read >> > > > > if >> > > > > > > > what >> > > > > > > > > > > >> solution >> > > > > > > > > > > >> > is >> > > > > > > > > > > >> > > included in this SEP and what is left out as >> not >> > in >> > > > > scope. >> > > > > > > > (for >> > > > > > > > > > > >> example, >> > > > > > > > > > > >> > > expansions for stateful jobs is supported or >> not). >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > We need to persist the task-to-sspList >> mapping >> > in >> > > > the >> > > > > > > > > > > >> > > coordinator stream so that the job can derive >> the >> > > > > original >> > > > > > > > > number >> > > > > > > > > > of >> > > > > > > > > > > >> > > partitions of each input stream regardless of >> how >> > > many >> > > > > > times >> > > > > > > > the >> > > > > > > > > > > >> > partition >> > > > > > > > > > > >> > > has expanded. Does this make sense? >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Yes. It does! >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > I am not sure how this is related to the >> > locality >> > > > > > though. >> > > > > > > > Can >> > > > > > > > > > you >> > > > > > > > > > > >> > clarify >> > > > > > > > > > > >> > > your question if I haven't answered your >> question? >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > It's not related. I just meant to give an >> example >> > of >> > > > yet >> > > > > > > > another >> > > > > > > > > > > >> > > coordinator message that is persisted. Your >> > > > ssp-to-task >> > > > > > > > mapping >> > > > > > > > > is >> > > > > > > > > > > >> > > following a similar pattern for persisting. >> Just >> > > > wanted >> > > > > to >> > > > > > > > > clarify >> > > > > > > > > > > >> that. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > Can you let me know if this, together with >> the >> > > > answers >> > > > > > in >> > > > > > > > the >> > > > > > > > > > > >> previous >> > > > > > > > > > > >> > > email, addresses all your questions? >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Yes. I believe you have addressed most of my >> > > > questions. >> > > > > > > Thanks >> > > > > > > > > for >> > > > > > > > > > > >> taking >> > > > > > > > > > > >> > > time to do that. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > Is there specific question you have regarding >> > > > > partition >> > > > > > > > > > > >> > > expansion in Kafka? >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > I guess my questions are on how partition >> > expansion >> > > in >> > > > > > Kafka >> > > > > > > > > > impacts >> > > > > > > > > > > >> the >> > > > > > > > > > > >> > > clients. Iiuc, partition expansions are done >> > > manually >> > > > in >> > > > > > > Kafka >> > > > > > > > > > based >> > > > > > > > > > > >> on >> > > > > > > > > > > >> > the >> > > > > > > > > > > >> > > bytes-in rate of the partition. Do the existing >> > > kafka >> > > > > > > clients >> > > > > > > > > > handle >> > > > > > > > > > > >> this >> > > > > > > > > > > >> > > expansion automatically? if yes, how does it >> work? >> > > If >> > > > > not, >> > > > > > > are >> > > > > > > > > > there >> > > > > > > > > > > >> > plans >> > > > > > > > > > > >> > > to support it in the future? >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > Thus user's job should not need to bootstrap >> > > > key/value >> > > > > > > store >> > > > > > > > > > from >> > > > > > > > > > > >> the >> > > > > > > > > > > >> > > changelog topic. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Why is this discussion relevant here? Key/value >> > > store >> > > > / >> > > > > > > > > changelog >> > > > > > > > > > > >> topic >> > > > > > > > > > > >> > > partition is scoped with the context of a task. >> > > Since >> > > > we >> > > > > > are >> > > > > > > > not >> > > > > > > > > > > >> changing >> > > > > > > > > > > >> > > the number of tasks, I don't think it is >> required >> > to >> > > > > > mention >> > > > > > > > it >> > > > > > > > > > > here. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > The new method takes the >> > > > SystemStreamPartition-to-Task >> > > > > > > > > > assignment >> > > > > > > > > > > >> from >> > > > > > > > > > > >> > > the previous job model which can be read from >> the >> > > > > > > coordinator >> > > > > > > > > > > stream. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Jobmodel is currently not persisted to >> coordinator >> > > > > stream. >> > > > > > > In >> > > > > > > > > your >> > > > > > > > > > > >> > design, >> > > > > > > > > > > >> > > you talk about writing separate coordinator >> > messages >> > > > for >> > > > > > > > > > ssp-to-task >> > > > > > > > > > > >> > > assignments. Hence, please correct this >> statement. >> > > It >> > > > is >> > > > > > > kind >> > > > > > > > of >> > > > > > > > > > > >> > misleading >> > > > > > > > > > > >> > > to the reader. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > My biggest gripe with this SEP is that it seems >> > > like a >> > > > > > > > > tailor-made >> > > > > > > > > > > >> > solution >> > > > > > > > > > > >> > > that relies on the semantics of the Kafka >> system >> > and >> > > > > yet, >> > > > > > we >> > > > > > > > are >> > > > > > > > > > > >> trying >> > > > > > > > > > > >> > to >> > > > > > > > > > > >> > > masquerade that as operational requirements for >> > > other >> > > > > > > systems >> > > > > > > > > > > >> interacting >> > > > > > > > > > > >> > > with Samza. (Not to say that this is the first >> > time >> > > > > such a >> > > > > > > > > choice >> > > > > > > > > > is >> > > > > > > > > > > >> > being >> > > > > > > > > > > >> > > made in the Samza design). I am not seeing how >> > this >> > > > can >> > > > > a >> > > > > > > > > > "general" >> > > > > > > > > > > >> > > solution for all input systems. That's my two >> > > cents. I >> > > > > > would >> > > > > > > > > like >> > > > > > > > > > to >> > > > > > > > > > > >> hear >> > > > > > > > > > > >> > > alternative points of view for this from other >> > devs. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Please make sure you have enough eyes on this >> SEP. >> > > If >> > > > > you >> > > > > > > do, >> > > > > > > > > > please >> > > > > > > > > > > >> > start >> > > > > > > > > > > >> > > a VOTE thread to approve this SEP. >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > Thanks! >> > > > > > > > > > > >> > > Navina >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin < >> > > > > > > > lindon...@gmail.com >> > > > > > > > > > >> > > > > > > > > > > >> wrote: >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > > > Hey Navina, >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > I have updated the wiki based on your >> > suggestion. >> > > > More >> > > > > > > > > > > >> specifically, I >> > > > > > > > > > > >> > > have >> > > > > > > > > > > >> > > > made the following changes: >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > - Improved Problem section and Motivation >> > section >> > > to >> > > > > > > > describe >> > > > > > > > > > why >> > > > > > > > > > > we >> > > > > > > > > > > >> > use >> > > > > > > > > > > >> > > > the solution in this proposal instead of >> > tackling >> > > > the >> > > > > > > > problem >> > > > > > > > > of >> > > > > > > > > > > >> task >> > > > > > > > > > > >> > > > expansion directly. >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > - Illustrate the design in a way that doesn't >> > bind >> > > > to >> > > > > > > Kafka. >> > > > > > > > > > Kafka >> > > > > > > > > > > >> is >> > > > > > > > > > > >> > > only >> > > > > > > > > > > >> > > > used as example to illustrate why we want to >> > > expand >> > > > > > > > partition >> > > > > > > > > > > >> expansion >> > > > > > > > > > > >> > > and >> > > > > > > > > > > >> > > > whether the operational requirement can be >> > > supported >> > > > > > when >> > > > > > > > > Kafka >> > > > > > > > > > is >> > > > > > > > > > > >> used >> > > > > > > > > > > >> > > as >> > > > > > > > > > > >> > > > the input system. Note that the proposed >> > solution >> > > > > should >> > > > > > > > work >> > > > > > > > > > for >> > > > > > > > > > > >> any >> > > > > > > > > > > >> > > input >> > > > > > > > > > > >> > > > system that meets the operational requirement >> > > > > described >> > > > > > in >> > > > > > > > the >> > > > > > > > > > > wiki. >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > - Fixed the problem in the figure. >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > - Added a new class >> > GroupBySystemStreamPartitionFi >> > > > > > > > xedTaskNum >> > > > > > > > > to >> > > > > > > > > > > the >> > > > > > > > > > > >> > > wiki. >> > > > > > > > > > > >> > > > Together with GroupByPartitionFixedTaskNum, >> it >> > > > should >> > > > > > > ensure >> > > > > > > > > > that >> > > > > > > > > > > we >> > > > > > > > > > > >> > > have a >> > > > > > > > > > > >> > > > solution to enable partition expansion for >> all >> > > users >> > > > > > that >> > > > > > > > are >> > > > > > > > > > > using >> > > > > > > > > > > >> > > > pre-defined grouper in Samza. Note that those >> > > users >> > > > > who >> > > > > > > use >> > > > > > > > > > custom >> > > > > > > > > > > >> > > grouper >> > > > > > > > > > > >> > > > would need to update their implementation. >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > Can you let me know if this, together with >> the >> > > > answers >> > > > > > in >> > > > > > > > the >> > > > > > > > > > > >> previous >> > > > > > > > > > > >> > > > email, addresses all your questions? Thanks >> for >> > > > taking >> > > > > > > time >> > > > > > > > to >> > > > > > > > > > > >> review >> > > > > > > > > > > >> > the >> > > > > > > > > > > >> > > > proposal. >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > Regards, >> > > > > > > > > > > >> > > > Dong >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin < >> > > > > > > > > lindon...@gmail.com >> > > > > > > > > > > >> > > > > > > > > > > >> > wrote: >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > > > Hey Navina, >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > Thanks much for your comments. Please see >> my >> > > reply >> > > > > > > inline. >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina >> > Ramesh >> > > > > > > (Apache) < >> > > > > > > > > > > >> > > > > nav...@apache.org> wrote: >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple >> of >> > > > > > questions >> > > > > > > to >> > > > > > > > > > > >> understand >> > > > > > > > > > > >> > > > your >> > > > > > > > > > > >> > > > >> proposal better: >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> * Under motivation, you mention that "_We >> > > expect >> > > > > this >> > > > > > > > > > solution >> > > > > > > > > > > to >> > > > > > > > > > > >> > work >> > > > > > > > > > > >> > > > >> similarly with other input system as >> well._", >> > > > yet I >> > > > > > > don't >> > > > > > > > > see >> > > > > > > > > > > any >> > > > > > > > > > > >> > > > >> discussion on how it will work with other >> > input >> > > > > > > systems. >> > > > > > > > > That >> > > > > > > > > > > is, >> > > > > > > > > > > >> > what >> > > > > > > > > > > >> > > > >> kind >> > > > > > > > > > > >> > > > >> of contract does samza expect from other >> > input >> > > > > > systems >> > > > > > > ? >> > > > > > > > If >> > > > > > > > > > we >> > > > > > > > > > > >> are >> > > > > > > > > > > >> > not >> > > > > > > > > > > >> > > > >> planning to provide a generic solution, it >> > > might >> > > > be >> > > > > > > worth >> > > > > > > > > > > >> calling it >> > > > > > > > > > > >> > > out >> > > > > > > > > > > >> > > > >> in >> > > > > > > > > > > >> > > > >> the SEP. >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > I think the contract we expect from other >> > > systems >> > > > > are >> > > > > > > > > exactly >> > > > > > > > > > > the >> > > > > > > > > > > >> > > > > operational requirement mentioned in the >> SEP, >> > > i.e. >> > > > > > > > > partitions >> > > > > > > > > > > >> should >> > > > > > > > > > > >> > > > always >> > > > > > > > > > > >> > > > > be doubled and the hash algorithm should >> > module >> > > > the >> > > > > > > number >> > > > > > > > > of >> > > > > > > > > > > >> > > partitions. >> > > > > > > > > > > >> > > > > SEP-5 should also allow partition >> expansion of >> > > all >> > > > > > input >> > > > > > > > > > systems >> > > > > > > > > > > >> that >> > > > > > > > > > > >> > > > meet >> > > > > > > > > > > >> > > > > these two requirements. I have updated the >> > > > > motivation >> > > > > > > > > section >> > > > > > > > > > to >> > > > > > > > > > > >> > > clarify >> > > > > > > > > > > >> > > > > this. >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> * I understand the partition mapping logic >> > you >> > > > have >> > > > > > > > > proposed. >> > > > > > > > > > > >> But I >> > > > > > > > > > > >> > > > think >> > > > > > > > > > > >> > > > >> the example explanation doesn't match the >> > > > diagram. >> > > > > In >> > > > > > > the >> > > > > > > > > > > >> diagram, >> > > > > > > > > > > >> > > after >> > > > > > > > > > > >> > > > >> expansion, partiion-0 and partition-1 are >> > > > pointing >> > > > > to >> > > > > > > > > bucket >> > > > > > > > > > 0 >> > > > > > > > > > > >> and >> > > > > > > > > > > >> > > > >> partition-3 and partition-4 are pointing >> to >> > > > bucket >> > > > > > 1. I >> > > > > > > > > think >> > > > > > > > > > > the >> > > > > > > > > > > >> > > former >> > > > > > > > > > > >> > > > >> has to be partition-0 and partition-2 and >> the >> > > > > latter, >> > > > > > > is >> > > > > > > > > > > >> partition-1 >> > > > > > > > > > > >> > > and >> > > > > > > > > > > >> > > > >> partition-3. If I am wrong, please help me >> > > > > understand >> > > > > > > the >> > > > > > > > > > logic >> > > > > > > > > > > >> :) >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > Good catch. I will update the figure to fix >> > this >> > > > > > > problem. >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> * I don't know how partition expansion in >> > Kafka >> > > > > > works. >> > > > > > > I >> > > > > > > > am >> > > > > > > > > > > >> familiar >> > > > > > > > > > > >> > > > with >> > > > > > > > > > > >> > > > >> how shard splitting happens in Kinesis - >> > there >> > > is >> > > > > > > > > > hierarchical >> > > > > > > > > > > >> > > relation >> > > > > > > > > > > >> > > > >> between the parent and child shards. This >> > way, >> > > it >> > > > > > will >> > > > > > > > also >> > > > > > > > > > > allow >> > > > > > > > > > > >> > the >> > > > > > > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only >> > > > supports >> > > > > > > > > partition >> > > > > > > > > > > >> > > > "expansion", >> > > > > > > > > > > >> > > > >> as opposed to "splits". Can you provide >> some >> > > > > context >> > > > > > or >> > > > > > > > > link >> > > > > > > > > > > >> related >> > > > > > > > > > > >> > > to >> > > > > > > > > > > >> > > > >> how >> > > > > > > > > > > >> > > > >> partition expansion works in Kafka? >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > I couldn't find any wiki on partition >> > expansion >> > > in >> > > > > > > Kafka. >> > > > > > > > > The >> > > > > > > > > > > >> > partition >> > > > > > > > > > > >> > > > > expansion logic in Kafka is very simply -- >> it >> > > > simply >> > > > > > > adds >> > > > > > > > > new >> > > > > > > > > > > >> > partition >> > > > > > > > > > > >> > > > to >> > > > > > > > > > > >> > > > > the existing topic. Is there specific >> question >> > > you >> > > > > > have >> > > > > > > > > > > regarding >> > > > > > > > > > > >> > > > partition >> > > > > > > > > > > >> > > > > expansion in Kafka? >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> * Are you only recommending that expansion >> > can >> > > be >> > > > > > > > supported >> > > > > > > > > > for >> > > > > > > > > > > >> > samza >> > > > > > > > > > > >> > > > jobs >> > > > > > > > > > > >> > > > >> that use Kafka as input systems **and** >> > > configure >> > > > > the >> > > > > > > > > > > SSPGrouper >> > > > > > > > > > > >> as >> > > > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me >> > like >> > > > > this >> > > > > > > only >> > > > > > > > > > > applies >> > > > > > > > > > > >> > for >> > > > > > > > > > > >> > > > >> GroupByPartition. Please correct me if I >> am >> > > > wrong. >> > > > > > What >> > > > > > > > is >> > > > > > > > > > the >> > > > > > > > > > > >> > > > expectation >> > > > > > > > > > > >> > > > >> for custom SSP Groupers? >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > The expansion can be supported for Samza >> jobs >> > if >> > > > the >> > > > > > > input >> > > > > > > > > > > system >> > > > > > > > > > > >> > meets >> > > > > > > > > > > >> > > > > the operational requirement mentioned >> above. >> > It >> > > > > > doesn't >> > > > > > > > have >> > > > > > > > > > to >> > > > > > > > > > > >> use >> > > > > > > > > > > >> > > Kafka >> > > > > > > > > > > >> > > > > as input system. >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > The current proposal provided solution for >> > jobs >> > > > that >> > > > > > > > > currently >> > > > > > > > > > > use >> > > > > > > > > > > >> > > > > GroupByPartition. The proposal can be >> extended >> > > to >> > > > > > > support >> > > > > > > > > jobs >> > > > > > > > > > > >> that >> > > > > > > > > > > >> > use >> > > > > > > > > > > >> > > > > other grouper that are pre-defined in >> Samza. >> > The >> > > > > > custom >> > > > > > > > SSP >> > > > > > > > > > > >> grouper >> > > > > > > > > > > >> > > needs >> > > > > > > > > > > >> > > > > to handle partition expansion similar to >> how >> > > > > > > > > > > >> > > GroupByPartitionFixedTaskNum >> > > > > > > > > > > >> > > > > handles it and it is users' responsibility >> to >> > > > update >> > > > > > > their >> > > > > > > > > > > custom >> > > > > > > > > > > >> > > grouper >> > > > > > > > > > > >> > > > > implementation. >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task >> assignment to >> > > > > > > coordinator >> > > > > > > > > > > stream: >> > > > > > > > > > > >> > > Today, >> > > > > > > > > > > >> > > > >> the JobModel encapsulates the data model >> in >> > > samza >> > > > > > which >> > > > > > > > > also >> > > > > > > > > > > >> > includes >> > > > > > > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows >> > the >> > > > > > > > > > task-to-sspList >> > > > > > > > > > > >> > > mapping. >> > > > > > > > > > > >> > > > >> What is the reason for using a separate >> > > > coordinator >> > > > > > > > stream >> > > > > > > > > > > >> message >> > > > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the >> > JobModel >> > > > > > itself >> > > > > > > is >> > > > > > > > > not >> > > > > > > > > > > >> > > persisted >> > > > > > > > > > > >> > > > in >> > > > > > > > > > > >> > > > >> the coordinator stream today? The reason >> > > > locality >> > > > > > > exists >> > > > > > > > > > > >> outside of >> > > > > > > > > > > >> > > the >> > > > > > > > > > > >> > > > >> jobmodel is because *locality* >> information is >> > > > > written >> > > > > > > by >> > > > > > > > > each >> > > > > > > > > > > >> > > container, >> > > > > > > > > > > >> > > > >> where as it is consumed only by the leader >> > > > > > > > > jobcoordinator/AM. >> > > > > > > > > > > In >> > > > > > > > > > > >> > this >> > > > > > > > > > > >> > > > >> case, >> > > > > > > > > > > >> > > > >> the writer of the mapping information and >> the >> > > > > reader >> > > > > > is >> > > > > > > > > still >> > > > > > > > > > > the >> > > > > > > > > > > >> > > leader >> > > > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to >> understand >> > the >> > > > > > > > motivation >> > > > > > > > > > for >> > > > > > > > > > > >> this >> > > > > > > > > > > >> > > > >> choice. >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > Yes, the reason for using a separate >> > coordinate >> > > > > stream >> > > > > > > > > message >> > > > > > > > > > > is >> > > > > > > > > > > >> > > because >> > > > > > > > > > > >> > > > > the task-to-sspList mapping is not >> currently >> > > > > persisted >> > > > > > > in >> > > > > > > > > the >> > > > > > > > > > > >> > > coordinator >> > > > > > > > > > > >> > > > > stream. We wouldn't need to create this new >> > > stream >> > > > > > > message >> > > > > > > > > if >> > > > > > > > > > > >> > JobModel >> > > > > > > > > > > >> > > is >> > > > > > > > > > > >> > > > > persisted. We need to persist the >> > > task-to-sspList >> > > > > > > mapping >> > > > > > > > in >> > > > > > > > > > the >> > > > > > > > > > > >> > > > > coordinator stream so that the job can >> derive >> > > the >> > > > > > > original >> > > > > > > > > > > number >> > > > > > > > > > > >> of >> > > > > > > > > > > >> > > > > partitions of each input stream regardless >> of >> > > how >> > > > > many >> > > > > > > > times >> > > > > > > > > > the >> > > > > > > > > > > >> > > > partition >> > > > > > > > > > > >> > > > > has expanded. Does this make sense? >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > I am not sure how this is related to the >> > > locality >> > > > > > > though. >> > > > > > > > > Can >> > > > > > > > > > > you >> > > > > > > > > > > >> > > clarify >> > > > > > > > > > > >> > > > > your question if I haven't answered your >> > > question? >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > Thanks! >> > > > > > > > > > > >> > > > > Dong >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> Cheers! >> > > > > > > > > > > >> > > > >> Navina >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin >> < >> > > > > > > > > > lindon...@gmail.com >> > > > > > > > > > > > >> > > > > > > > > > > >> > > wrote: >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > >> > Hi all, >> > > > > > > > > > > >> > > > >> > >> > > > > > > > > > > >> > > > >> > We created SEP-5: Enable partition >> > expansion >> > > of >> > > > > > input >> > > > > > > > > > > streams. >> > > > > > > > > > > >> > > Please >> > > > > > > > > > > >> > > > >> find >> > > > > > > > > > > >> > > > >> > the SEP wiki in the link >> > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/ >> > > > > > > confluence/display/SAMZA/SEP- >> > > > > > > > > > > >> > > > >> > 5%3A+Enable+partition+ >> > > > expansion+of+input+streams >> > > > > > > > > > > >> > > > >> > . >> > > > > > > > > > > >> > > > >> > >> > > > > > > > > > > >> > > > >> > You feedback is appreciated! >> > > > > > > > > > > >> > > > >> > >> > > > > > > > > > > >> > > > >> > Thanks, >> > > > > > > > > > > >> > > > >> > Dong >> > > > > > > > > > > >> > > > >> > >> > > > > > > > > > > >> > > > >> >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > > >> > > > > > > > > > > >> > > > >> > > > > > > > > > > >> > > >> > > > > > > > > > > >> > >> > > > > > > > > > > >> >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> >> >> >> -- >> We are hiring in Streams Infra (Kafka/Samza/Datastream) !! >> > >