Hey Jacob, Navina, Yi, I am wondering if my answer has addressed your concern. Can you let me know if there is any concern with SEP?
Thanks, Dong On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com> wrote: > Hey Jacob, > > Thanks for taking time to review the SEP. > > I agree with you and Navina that the current SEP doesn't provide support > to arbitrary input systems and it doesn't support partition shrink. I think > the scope of this SEP is to support partition expansion for Kafka (the most > widely used input system of Samza) and keep the door open for partition > support of various input systems. The current design can support any system > that meets the two operational requirement specified in the doc. > > While it is possible to support more types of input systems, it will > likely add more complexity to the design. For example, the first > alternative solution from you requires broker-side support to negotiate > hash algorithm. The second alternative solution requires changelog > partition reshuffle which carries its own design complexity and performance > overhead. There is tradeoff between the generality and the complexity among > these choices. I like the current design because it is simple and addresses > a big usage scenario for us. We can add more complexity to generalize the > design if it enables important use-case. Does this sound reasonable? > > Note that the "Rejected Alternative" section also mentions the possibility > of supporting a wider range of input systems by allowing user to specify > the new-partition to old-partition mapping. We are not doing it because 1) > we may have better understanding of the design after we have a specific > second input system to support 2) the current design can be extended to > support general input systems. I think similar argument can be applied > explain why we don't have to support general input systems using the > potentially-good alternatives you mentioned. > > I hope SEP-5 can be an important first-step towards supporting partition > expansion of any input system. > > To answer your questions about the current proposal: > > >1. "An alternative solution is to allow task number to increase after > >partition expansion and uses a proper task-to-container assignment to > make > >sure the Samza output is correct." What does the container have to do > with > >stateful processing or output in general? > > The task-to-container assignment matters because if the correlated tasks > (i.e. tasks that consume messages with the same key) needs to be in the > same container so that they can share the same key/value local store on the > same physical machine. > > >2. When you use "Join" as an example, you basically mean multiple > >co-partitioned streams, right? This is opposed to multiple, > >independently-partitioned streams or a single stream. Would be nice to > >formulate the proposal in these more general terms. > > I thought "join" is a commonly used to refer to the join opeartion with > co-partitioned stream but I may be wrong. I have updated the wiki to > explicitly mention "co-partitioned stream". Does this look better now? > > >3. When switching SSP groupers, how will the users avoid the > >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition > GrouperFactoryValues > >exception? > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that > exception will not be thrown if new grouper is > GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition. > Does this look reasonable? > > >4. Partition to task assignment is meaningless without key to partition > >mapping. The real semantics are captured in the external requirement for > >partitioning via hash+modulo. But in that case, iiuc, only the partition > >count matters. So why not just store the original partition count rather > >than the whole mapping? > > I think storing the previous task-to-partition mapping is more general > than storing the partition count of all topics for the following reasons: > > - Samza already stores the task-to-container mapping and container-to-host > mapping in the coordinator stream. It seems consistent to also store the > partition-to-task mapping. And this information may be useful for other > use-case such as debugging. > > - By having the new interface take the previous task-to-partition > assignment instead of a topic-to-partition-count mapping as new parameter, > we can potentially have grouper implementation to support other types of > input systems. > > - It is sightly simpler to store the task-to-partition assignment because > we don't need to know whether this is the first time a job is started or > not. On the other hand, you can write topic-to-partition-count mapping to > the coordinator stream only if this is the first time the job is run > > Thanks, > Dong > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <jacob.m...@gmail.com> wrote: > >> Hey Dong, >> >> Thanks for the SEP. Supporting partition changes is critically important >> for stateful Samza jobs, so it's great to see some ideas on that front! >> >> Sorry for the late feedback, but I have a few thoughts to contribute. >> >> Big +1 on Navina's comment: >> >> > My biggest gripe with this SEP is that it seems like a tailor-made >> > solution >> > that relies on the semantics of the Kafka system and yet, we are trying >> to >> > masquerade that as operational requirements for other systems >> interacting >> > with Samza. (Not to say that this is the first time such a choice is >> being >> > made in the Samza design). I am not seeing how this can a "general" >> > solution for all input systems. That's my two cents. I would like to >> hear >> > alternative points of view for this from other devs. >> >> >> Two examples of this: >> 1. This is mostly a hypothetical, but some message brokers may use key >> range assignment rather than hash+modulo. >> 2. Kafka can't reduce the number of partitions, but it can happen on other >> systems. For example, it may be cheaper to reduce the number of partitions >> on a hosted service where the cost model depends on the number of >> partitions/shards. >> >> It seems to me that a solution which doesn't depend on partition key >> assignment in the message broker. Here are a few alternatives that weren't >> discussed and I think should be considered: >> >> Alternatives in order of increasing preference: >> 1. Samza manages the partition hash (via some new contract with the >> brokers) and guarantees correct routing of keys among the new partitions. >> 2. Samza detects a task count change, creates a new changelog with correct >> partitions, and *somehow* reshuffles all existing changelog data into the >> new topic and then uses the new topic from then on. (doesn't work without >> changelog, but in that case durability isn't paramount, so we can just >> wipe) >> 3. Use RPC in between stages and samza fully manages key assignment among >> tasks. No on-disk topic data to clean up. Mandatory repartitioning in the >> first stage to pre-scaled tasks in next stage. >> 4. Combined 2-3 solution >> >> Finally, some questions about the current proposal: >> 1. "An alternative solution is to allow task number to increase after >> partition expansion and uses a proper task-to-container assignment to make >> sure the Samza output is correct." What does the container have to do with >> stateful processing or output in general? >> 2. When you use "Join" as an example, you basically mean multiple >> co-partitioned streams, right? This is opposed to multiple, >> independently-partitioned streams or a single stream. Would be nice to >> formulate the proposal in these more general terms. >> 3. When switching SSP groupers, how will the users avoid the >> org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti >> tionGrouperFactoryValues >> exception? >> 4. Partition to task assignment is meaningless without key to partition >> mapping. The real semantics are captured in the external requirement for >> partitioning via hash+modulo. But in that case, iiuc, only the partition >> count matters. So why not just store the original partition count rather >> than the whole mapping? >> >> -Jake >> >> On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Yi, Navina, >> > >> > I have updated the SEP-5 document based on our discussion. The >> difference >> > can be found here >> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action? >> > pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>. >> > Here is the summary of changes: >> > >> > - Add new interface that extends the existing interface >> > SystemStreamPartitionGrouper. Newly-added grouper class should implement >> > this interface. >> > - Explained in the Rejected Alternative Section why we don't add new >> method >> > in the existing interface >> > - Explained in the Rejected Alternative Section why we don't >> config/class >> > for user to specify new-partition to old-partition mapping. >> > >> > Can you take another look at the proposal and let me know if there is >> any >> > concern? >> > >> > Cheers, >> > Dong >> > >> > >> > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com> wrote: >> > >> > > Hey Yi, >> > > >> > > Thanks much for the comment. I have updated the doc to address all >> your >> > > comments except the one related to the interface. I am not sure I >> > > understand your suggestion of the new interface. Will discuss >> tomorrow. >> > > >> > > Thanks, >> > > Dong >> > > >> > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote: >> > > >> > >> Hi, Don, >> > >> >> > >> Thanks for the detailed design doc for a long-waited feature in >> Samza! >> > >> Really appreciate it! I did a quick pass and have the following >> > comments: >> > >> >> > >> - minor: "limit the maximum size of partition" ==> "limit the maximum >> > size >> > >> of each partition" >> > >> - "However, Samza currently is not able to handle partition >> expansion of >> > >> the input streams"==>better point out "for stateful jobs". For >> stateless >> > >> jobs, simply bouncing the job now can pick up the new partitions. >> > >> - "it is possible (e.g. with Kafka) that messages with a given key >> > exists >> > >> in both partition 1 an 3. Because GroupByPartition will assign >> > partition 1 >> > >> and 3 to different tasks, messages with the same key may be handled >> by >> > >> different task/container/process and their state will be stored in >> > >> different changelog partition." The problem statement is not super >> clear >> > >> here. The issues with stateful jobs is: after GroupByPartition assign >> > >> partition 1 and 3 to different tasks, the new task handling >> partition 3 >> > >> does not have the previous state to resume the work. e.g. a page-key >> > based >> > >> counter would start from 0 in the new task for a specific key, >> instead >> > of >> > >> resuming the previous count 50 held by task 1. >> > >> - minor rewording: "the first solution in this doc" ==> "the solution >> > >> proposed in this doc" >> > >> - "Thus additional development work is needed in Kafka to meet this >> > >> requirement" It would be good to link to a KIP if and when it exists >> > >> - Instead of touching/deprecating the interface >> > >> SystemStreamPartitionGrouper, I would recommend to have a different >> > >> implementation class of the interface, which in the constructor of >> the >> > >> grouper, takes two parameters: a) the previous task number read from >> the >> > >> coordinator stream; b) the configured new-partition to old-partition >> > >> mapping policy. Then, the grouper's interface method stays the same >> and >> > >> the >> > >> behavior of the grouper is more configurable which is good to >> support a >> > >> broader set of use cases in addition to Kafka's built-in partition >> > >> expansion policies. >> > >> - Minor renaming suggestion to the new grouper class names: >> > >> GroupByPartitionWithFixedTaskNum >> > >> and GroupBySystemStreamPartitionWithFixedTaskNum >> > >> >> > >> Thanks! >> > >> >> > >> - Yi >> > >> >> > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > >> >> > >> > Hey Navina, >> > >> > >> > >> > Thanks much for the comment. Please see my response below. >> > >> > >> > >> > Regarding your biggest gripe with the SEP, I personally think the >> > >> > operational requirement proposed in the KIP are pretty general and >> > >> could be >> > >> > easily enforced by other systems. The reason is that the module >> > >> operation >> > >> > is pretty standard and the default option when we choose partition. >> > And >> > >> > usually the underlying system allows user to select arbitrary >> > partition >> > >> > number if it supports partition expansion. Do you know any system >> that >> > >> does >> > >> > not meet these two requirement? >> > >> > >> > >> > Regarding your comment of the Motivation section, I renamed the >> first >> > >> > section as "Problem and Goal" and specified that "*The goal of this >> > >> > proposal is to enable partition expansion of the input streams*.". >> I >> > >> also >> > >> > put a sentence at the end of the Motivation section that "*The >> feature >> > >> of >> > >> > task expansion is out of the scope of this proposal and will be >> > >> addressed >> > >> > in a future SEP*". The second paragraph in the Motivation section >> is >> > >> mainly >> > >> > used to explain the thinking process that we have gone through, >> what >> > >> other >> > >> > alternative we have considered, and we plan to do in Samza in the >> nex >> > >> step. >> > >> > >> > >> > To answer your question why increasing the partition number will >> > >> increase >> > >> > the throughput of the kafka consumer in the container, Kafka >> consumer >> > >> can >> > >> > potentially fetch more data in one FetchResponse with more >> partitions >> > in >> > >> > the FetchRequest. This is because we limit the maximum amount of >> data >> > >> that >> > >> > can be fetch for a given partition in the FetchResponse. This by >> > >> default is >> > >> > set to 1 MB. And there is reason that we can not arbitrarily bump >> up >> > >> this >> > >> > limit. >> > >> > >> > >> > To answer your question how partition expansion in Kafka impacts >> the >> > >> > clients, Kafka consumer is able to automatically detect new >> partition >> > of >> > >> > the topic and reassign all (both old and new) partitions across >> > >> consumers >> > >> > in the consumer group IF you tell consumer the topic to be >> subscribed. >> > >> But >> > >> > consumer in Samza's container uses another way of subscription. >> > Instead >> > >> of >> > >> > subscribing to the topic, the consumer in Samza's container >> subscribes >> > >> to >> > >> > the specific partitions of the topic. In this case, if new >> partitions >> > >> have >> > >> > been added, Samza will need to explicitly subscribe to the new >> > >> partitions >> > >> > of the topic. The "Handle partition expansion while tasks are >> running" >> > >> > section in the SEP addresses this issue in Samza -- it recalculates >> > the >> > >> job >> > >> > model and restart container so that consumer can subscribe to the >> new >> > >> > partitions. >> > >> > >> > >> > I will ask other dev to take a look at the proposal. I will start >> the >> > >> > voting thread tomorrow if there is no further concern with the SEP. >> > >> > >> > >> > Thanks! >> > >> > Dong >> > >> > >> > >> > >> > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) < >> > >> > nav...@apache.org> >> > >> > wrote: >> > >> > >> > >> > > Hey Dong, >> > >> > > >> > >> > > > I have updated the motivation section to clarify this. >> > >> > > >> > >> > > Thanks for updating the motivation. Couple of notes here: >> > >> > > >> > >> > > 1. >> > >> > > > "The motivation of increasing partition number of Kafka topic >> > >> includes >> > >> > 1) >> > >> > > limit the maximum size of a partition in order to improve broker >> > >> > > performance and 2) increase throughput of Kafka consumer in the >> > Samza >> > >> > > container." >> > >> > > >> > >> > > It's unclear to me how increasing the partition number will >> increase >> > >> the >> > >> > > throughput of the kafka consumer in the container? Theoretically, >> > you >> > >> > will >> > >> > > still be consuming the same amount of data in the container, >> > >> irrespective >> > >> > > of whether it is coming from one partition or more than one >> expanded >> > >> > > partitions. Can you please explain it for me here, what you mean >> by >> > >> that? >> > >> > > >> > >> > > 2. I believe the second paragraph under motivation is simply >> talking >> > >> > about >> > >> > > the scope of the current SEP. It will be easier to read if what >> > >> solution >> > >> > is >> > >> > > included in this SEP and what is left out as not in scope. (for >> > >> example, >> > >> > > expansions for stateful jobs is supported or not). >> > >> > > >> > >> > > > We need to persist the task-to-sspList mapping in the >> > >> > > coordinator stream so that the job can derive the original >> number of >> > >> > > partitions of each input stream regardless of how many times the >> > >> > partition >> > >> > > has expanded. Does this make sense? >> > >> > > >> > >> > > Yes. It does! >> > >> > > >> > >> > > > I am not sure how this is related to the locality though. Can >> you >> > >> > clarify >> > >> > > your question if I haven't answered your question? >> > >> > > >> > >> > > It's not related. I just meant to give an example of yet another >> > >> > > coordinator message that is persisted. Your ssp-to-task mapping >> is >> > >> > > following a similar pattern for persisting. Just wanted to >> clarify >> > >> that. >> > >> > > >> > >> > > > Can you let me know if this, together with the answers in the >> > >> previous >> > >> > > email, addresses all your questions? >> > >> > > >> > >> > > Yes. I believe you have addressed most of my questions. Thanks >> for >> > >> taking >> > >> > > time to do that. >> > >> > > >> > >> > > > Is there specific question you have regarding partition >> > >> > > expansion in Kafka? >> > >> > > >> > >> > > I guess my questions are on how partition expansion in Kafka >> impacts >> > >> the >> > >> > > clients. Iiuc, partition expansions are done manually in Kafka >> based >> > >> on >> > >> > the >> > >> > > bytes-in rate of the partition. Do the existing kafka clients >> handle >> > >> this >> > >> > > expansion automatically? if yes, how does it work? If not, are >> there >> > >> > plans >> > >> > > to support it in the future? >> > >> > > >> > >> > > > Thus user's job should not need to bootstrap key/value store >> from >> > >> the >> > >> > > changelog topic. >> > >> > > >> > >> > > Why is this discussion relevant here? Key/value store / changelog >> > >> topic >> > >> > > partition is scoped with the context of a task. Since we are not >> > >> changing >> > >> > > the number of tasks, I don't think it is required to mention it >> > here. >> > >> > > >> > >> > > > The new method takes the SystemStreamPartition-to-Task >> assignment >> > >> from >> > >> > > the previous job model which can be read from the coordinator >> > stream. >> > >> > > >> > >> > > Jobmodel is currently not persisted to coordinator stream. In >> your >> > >> > design, >> > >> > > you talk about writing separate coordinator messages for >> ssp-to-task >> > >> > > assignments. Hence, please correct this statement. It is kind of >> > >> > misleading >> > >> > > to the reader. >> > >> > > >> > >> > > My biggest gripe with this SEP is that it seems like a >> tailor-made >> > >> > solution >> > >> > > that relies on the semantics of the Kafka system and yet, we are >> > >> trying >> > >> > to >> > >> > > masquerade that as operational requirements for other systems >> > >> interacting >> > >> > > with Samza. (Not to say that this is the first time such a >> choice is >> > >> > being >> > >> > > made in the Samza design). I am not seeing how this can a >> "general" >> > >> > > solution for all input systems. That's my two cents. I would >> like to >> > >> hear >> > >> > > alternative points of view for this from other devs. >> > >> > > >> > >> > > Please make sure you have enough eyes on this SEP. If you do, >> please >> > >> > start >> > >> > > a VOTE thread to approve this SEP. >> > >> > > >> > >> > > Thanks! >> > >> > > Navina >> > >> > > >> > >> > > >> > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com> >> > >> wrote: >> > >> > > >> > >> > > > Hey Navina, >> > >> > > > >> > >> > > > I have updated the wiki based on your suggestion. More >> > >> specifically, I >> > >> > > have >> > >> > > > made the following changes: >> > >> > > > >> > >> > > > - Improved Problem section and Motivation section to describe >> why >> > we >> > >> > use >> > >> > > > the solution in this proposal instead of tackling the problem >> of >> > >> task >> > >> > > > expansion directly. >> > >> > > > >> > >> > > > - Illustrate the design in a way that doesn't bind to Kafka. >> Kafka >> > >> is >> > >> > > only >> > >> > > > used as example to illustrate why we want to expand partition >> > >> expansion >> > >> > > and >> > >> > > > whether the operational requirement can be supported when >> Kafka is >> > >> used >> > >> > > as >> > >> > > > the input system. Note that the proposed solution should work >> for >> > >> any >> > >> > > input >> > >> > > > system that meets the operational requirement described in the >> > wiki. >> > >> > > > >> > >> > > > - Fixed the problem in the figure. >> > >> > > > >> > >> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum >> to >> > the >> > >> > > wiki. >> > >> > > > Together with GroupByPartitionFixedTaskNum, it should ensure >> that >> > we >> > >> > > have a >> > >> > > > solution to enable partition expansion for all users that are >> > using >> > >> > > > pre-defined grouper in Samza. Note that those users who use >> custom >> > >> > > grouper >> > >> > > > would need to update their implementation. >> > >> > > > >> > >> > > > Can you let me know if this, together with the answers in the >> > >> previous >> > >> > > > email, addresses all your questions? Thanks for taking time to >> > >> review >> > >> > the >> > >> > > > proposal. >> > >> > > > >> > >> > > > Regards, >> > >> > > > Dong >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin < >> lindon...@gmail.com> >> > >> > wrote: >> > >> > > > >> > >> > > > > Hey Navina, >> > >> > > > > >> > >> > > > > Thanks much for your comments. Please see my reply inline. >> > >> > > > > >> > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) < >> > >> > > > > nav...@apache.org> wrote: >> > >> > > > > >> > >> > > > >> Thanks for the SEP, Dong. I have a couple of questions to >> > >> understand >> > >> > > > your >> > >> > > > >> proposal better: >> > >> > > > >> >> > >> > > > >> * Under motivation, you mention that "_We expect this >> solution >> > to >> > >> > work >> > >> > > > >> similarly with other input system as well._", yet I don't >> see >> > any >> > >> > > > >> discussion on how it will work with other input systems. >> That >> > is, >> > >> > what >> > >> > > > >> kind >> > >> > > > >> of contract does samza expect from other input systems ? If >> we >> > >> are >> > >> > not >> > >> > > > >> planning to provide a generic solution, it might be worth >> > >> calling it >> > >> > > out >> > >> > > > >> in >> > >> > > > >> the SEP. >> > >> > > > >> >> > >> > > > > >> > >> > > > > I think the contract we expect from other systems are exactly >> > the >> > >> > > > > operational requirement mentioned in the SEP, i.e. partitions >> > >> should >> > >> > > > always >> > >> > > > > be doubled and the hash algorithm should module the number of >> > >> > > partitions. >> > >> > > > > SEP-5 should also allow partition expansion of all input >> systems >> > >> that >> > >> > > > meet >> > >> > > > > these two requirements. I have updated the motivation >> section to >> > >> > > clarify >> > >> > > > > this. >> > >> > > > > >> > >> > > > > >> > >> > > > >> >> > >> > > > >> * I understand the partition mapping logic you have >> proposed. >> > >> But I >> > >> > > > think >> > >> > > > >> the example explanation doesn't match the diagram. In the >> > >> diagram, >> > >> > > after >> > >> > > > >> expansion, partiion-0 and partition-1 are pointing to >> bucket 0 >> > >> and >> > >> > > > >> partition-3 and partition-4 are pointing to bucket 1. I >> think >> > the >> > >> > > former >> > >> > > > >> has to be partition-0 and partition-2 and the latter, is >> > >> partition-1 >> > >> > > and >> > >> > > > >> partition-3. If I am wrong, please help me understand the >> logic >> > >> :) >> > >> > > > >> >> > >> > > > > >> > >> > > > > Good catch. I will update the figure to fix this problem. >> > >> > > > > >> > >> > > > > >> > >> > > > >> >> > >> > > > >> * I don't know how partition expansion in Kafka works. I am >> > >> familiar >> > >> > > > with >> > >> > > > >> how shard splitting happens in Kinesis - there is >> hierarchical >> > >> > > relation >> > >> > > > >> between the parent and child shards. This way, it will also >> > allow >> > >> > the >> > >> > > > >> shards to be merged back. Iiuc, Kafka only supports >> partition >> > >> > > > "expansion", >> > >> > > > >> as opposed to "splits". Can you provide some context or link >> > >> related >> > >> > > to >> > >> > > > >> how >> > >> > > > >> partition expansion works in Kafka? >> > >> > > > >> >> > >> > > > > >> > >> > > > > I couldn't find any wiki on partition expansion in Kafka. The >> > >> > partition >> > >> > > > > expansion logic in Kafka is very simply -- it simply adds new >> > >> > partition >> > >> > > > to >> > >> > > > > the existing topic. Is there specific question you have >> > regarding >> > >> > > > partition >> > >> > > > > expansion in Kafka? >> > >> > > > > >> > >> > > > > >> > >> > > > >> >> > >> > > > >> * Are you only recommending that expansion can be supported >> for >> > >> > samza >> > >> > > > jobs >> > >> > > > >> that use Kafka as input systems **and** configure the >> > SSPGrouper >> > >> as >> > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only >> > applies >> > >> > for >> > >> > > > >> GroupByPartition. Please correct me if I am wrong. What is >> the >> > >> > > > expectation >> > >> > > > >> for custom SSP Groupers? >> > >> > > > >> >> > >> > > > > >> > >> > > > > The expansion can be supported for Samza jobs if the input >> > system >> > >> > meets >> > >> > > > > the operational requirement mentioned above. It doesn't have >> to >> > >> use >> > >> > > Kafka >> > >> > > > > as input system. >> > >> > > > > >> > >> > > > > The current proposal provided solution for jobs that >> currently >> > use >> > >> > > > > GroupByPartition. The proposal can be extended to support >> jobs >> > >> that >> > >> > use >> > >> > > > > other grouper that are pre-defined in Samza. The custom SSP >> > >> grouper >> > >> > > needs >> > >> > > > > to handle partition expansion similar to how >> > >> > > GroupByPartitionFixedTaskNum >> > >> > > > > handles it and it is users' responsibility to update their >> > custom >> > >> > > grouper >> > >> > > > > implementation. >> > >> > > > > >> > >> > > > > >> > >> > > > >> >> > >> > > > >> * Regarding storing SSP-to-Task assignment to coordinator >> > stream: >> > >> > > Today, >> > >> > > > >> the JobModel encapsulates the data model in samza which also >> > >> > includes >> > >> > > > >> **TaskModels**. TaskModel, typically shows the >> task-to-sspList >> > >> > > mapping. >> > >> > > > >> What is the reason for using a separate coordinator stream >> > >> message >> > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is >> not >> > >> > > persisted >> > >> > > > in >> > >> > > > >> the coordinator stream today? The reason locality exists >> > >> outside of >> > >> > > the >> > >> > > > >> jobmodel is because *locality* information is written by >> each >> > >> > > container, >> > >> > > > >> where as it is consumed only by the leader >> jobcoordinator/AM. >> > In >> > >> > this >> > >> > > > >> case, >> > >> > > > >> the writer of the mapping information and the reader is >> still >> > the >> > >> > > leader >> > >> > > > >> jobcoordinator/AM. So, I want to understand the motivation >> for >> > >> this >> > >> > > > >> choice. >> > >> > > > >> >> > >> > > > > >> > >> > > > > Yes, the reason for using a separate coordinate stream >> message >> > is >> > >> > > because >> > >> > > > > the task-to-sspList mapping is not currently persisted in the >> > >> > > coordinator >> > >> > > > > stream. We wouldn't need to create this new stream message if >> > >> > JobModel >> > >> > > is >> > >> > > > > persisted. We need to persist the task-to-sspList mapping in >> the >> > >> > > > > coordinator stream so that the job can derive the original >> > number >> > >> of >> > >> > > > > partitions of each input stream regardless of how many times >> the >> > >> > > > partition >> > >> > > > > has expanded. Does this make sense? >> > >> > > > > >> > >> > > > > I am not sure how this is related to the locality though. Can >> > you >> > >> > > clarify >> > >> > > > > your question if I haven't answered your question? >> > >> > > > > >> > >> > > > > Thanks! >> > >> > > > > Dong >> > >> > > > > >> > >> > > > > >> > >> > > > >> >> > >> > > > >> Cheers! >> > >> > > > >> Navina >> > >> > > > >> >> > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin < >> lindon...@gmail.com >> > > >> > >> > > wrote: >> > >> > > > >> >> > >> > > > >> > Hi all, >> > >> > > > >> > >> > >> > > > >> > We created SEP-5: Enable partition expansion of input >> > streams. >> > >> > > Please >> > >> > > > >> find >> > >> > > > >> > the SEP wiki in the link >> > >> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP- >> > >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams >> > >> > > > >> > . >> > >> > > > >> > >> > >> > > > >> > You feedback is appreciated! >> > >> > > > >> > >> > >> > > > >> > Thanks, >> > >> > > > >> > Dong >> > >> > > > >> > >> > >> > > > >> >> > >> > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > > >> > > >> > >> > >