Hey Yi, Navina, I have updated the SEP-5 document based on our discussion. The difference can be found here <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>. Here is the summary of changes:
- Add new interface that extends the existing interface SystemStreamPartitionGrouper. Newly-added grouper class should implement this interface. - Explained in the Rejected Alternative Section why we don't add new method in the existing interface - Explained in the Rejected Alternative Section why we don't config/class for user to specify new-partition to old-partition mapping. Can you take another look at the proposal and let me know if there is any concern? Cheers, Dong On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com> wrote: > Hey Yi, > > Thanks much for the comment. I have updated the doc to address all your > comments except the one related to the interface. I am not sure I > understand your suggestion of the new interface. Will discuss tomorrow. > > Thanks, > Dong > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote: > >> Hi, Don, >> >> Thanks for the detailed design doc for a long-waited feature in Samza! >> Really appreciate it! I did a quick pass and have the following comments: >> >> - minor: "limit the maximum size of partition" ==> "limit the maximum size >> of each partition" >> - "However, Samza currently is not able to handle partition expansion of >> the input streams"==>better point out "for stateful jobs". For stateless >> jobs, simply bouncing the job now can pick up the new partitions. >> - "it is possible (e.g. with Kafka) that messages with a given key exists >> in both partition 1 an 3. Because GroupByPartition will assign partition 1 >> and 3 to different tasks, messages with the same key may be handled by >> different task/container/process and their state will be stored in >> different changelog partition." The problem statement is not super clear >> here. The issues with stateful jobs is: after GroupByPartition assign >> partition 1 and 3 to different tasks, the new task handling partition 3 >> does not have the previous state to resume the work. e.g. a page-key based >> counter would start from 0 in the new task for a specific key, instead of >> resuming the previous count 50 held by task 1. >> - minor rewording: "the first solution in this doc" ==> "the solution >> proposed in this doc" >> - "Thus additional development work is needed in Kafka to meet this >> requirement" It would be good to link to a KIP if and when it exists >> - Instead of touching/deprecating the interface >> SystemStreamPartitionGrouper, I would recommend to have a different >> implementation class of the interface, which in the constructor of the >> grouper, takes two parameters: a) the previous task number read from the >> coordinator stream; b) the configured new-partition to old-partition >> mapping policy. Then, the grouper's interface method stays the same and >> the >> behavior of the grouper is more configurable which is good to support a >> broader set of use cases in addition to Kafka's built-in partition >> expansion policies. >> - Minor renaming suggestion to the new grouper class names: >> GroupByPartitionWithFixedTaskNum >> and GroupBySystemStreamPartitionWithFixedTaskNum >> >> Thanks! >> >> - Yi >> >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote: >> >> > Hey Navina, >> > >> > Thanks much for the comment. Please see my response below. >> > >> > Regarding your biggest gripe with the SEP, I personally think the >> > operational requirement proposed in the KIP are pretty general and >> could be >> > easily enforced by other systems. The reason is that the module >> operation >> > is pretty standard and the default option when we choose partition. And >> > usually the underlying system allows user to select arbitrary partition >> > number if it supports partition expansion. Do you know any system that >> does >> > not meet these two requirement? >> > >> > Regarding your comment of the Motivation section, I renamed the first >> > section as "Problem and Goal" and specified that "*The goal of this >> > proposal is to enable partition expansion of the input streams*.". I >> also >> > put a sentence at the end of the Motivation section that "*The feature >> of >> > task expansion is out of the scope of this proposal and will be >> addressed >> > in a future SEP*". The second paragraph in the Motivation section is >> mainly >> > used to explain the thinking process that we have gone through, what >> other >> > alternative we have considered, and we plan to do in Samza in the nex >> step. >> > >> > To answer your question why increasing the partition number will >> increase >> > the throughput of the kafka consumer in the container, Kafka consumer >> can >> > potentially fetch more data in one FetchResponse with more partitions in >> > the FetchRequest. This is because we limit the maximum amount of data >> that >> > can be fetch for a given partition in the FetchResponse. This by >> default is >> > set to 1 MB. And there is reason that we can not arbitrarily bump up >> this >> > limit. >> > >> > To answer your question how partition expansion in Kafka impacts the >> > clients, Kafka consumer is able to automatically detect new partition of >> > the topic and reassign all (both old and new) partitions across >> consumers >> > in the consumer group IF you tell consumer the topic to be subscribed. >> But >> > consumer in Samza's container uses another way of subscription. Instead >> of >> > subscribing to the topic, the consumer in Samza's container subscribes >> to >> > the specific partitions of the topic. In this case, if new partitions >> have >> > been added, Samza will need to explicitly subscribe to the new >> partitions >> > of the topic. The "Handle partition expansion while tasks are running" >> > section in the SEP addresses this issue in Samza -- it recalculates the >> job >> > model and restart container so that consumer can subscribe to the new >> > partitions. >> > >> > I will ask other dev to take a look at the proposal. I will start the >> > voting thread tomorrow if there is no further concern with the SEP. >> > >> > Thanks! >> > Dong >> > >> > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) < >> > nav...@apache.org> >> > wrote: >> > >> > > Hey Dong, >> > > >> > > > I have updated the motivation section to clarify this. >> > > >> > > Thanks for updating the motivation. Couple of notes here: >> > > >> > > 1. >> > > > "The motivation of increasing partition number of Kafka topic >> includes >> > 1) >> > > limit the maximum size of a partition in order to improve broker >> > > performance and 2) increase throughput of Kafka consumer in the Samza >> > > container." >> > > >> > > It's unclear to me how increasing the partition number will increase >> the >> > > throughput of the kafka consumer in the container? Theoretically, you >> > will >> > > still be consuming the same amount of data in the container, >> irrespective >> > > of whether it is coming from one partition or more than one expanded >> > > partitions. Can you please explain it for me here, what you mean by >> that? >> > > >> > > 2. I believe the second paragraph under motivation is simply talking >> > about >> > > the scope of the current SEP. It will be easier to read if what >> solution >> > is >> > > included in this SEP and what is left out as not in scope. (for >> example, >> > > expansions for stateful jobs is supported or not). >> > > >> > > > We need to persist the task-to-sspList mapping in the >> > > coordinator stream so that the job can derive the original number of >> > > partitions of each input stream regardless of how many times the >> > partition >> > > has expanded. Does this make sense? >> > > >> > > Yes. It does! >> > > >> > > > I am not sure how this is related to the locality though. Can you >> > clarify >> > > your question if I haven't answered your question? >> > > >> > > It's not related. I just meant to give an example of yet another >> > > coordinator message that is persisted. Your ssp-to-task mapping is >> > > following a similar pattern for persisting. Just wanted to clarify >> that. >> > > >> > > > Can you let me know if this, together with the answers in the >> previous >> > > email, addresses all your questions? >> > > >> > > Yes. I believe you have addressed most of my questions. Thanks for >> taking >> > > time to do that. >> > > >> > > > Is there specific question you have regarding partition >> > > expansion in Kafka? >> > > >> > > I guess my questions are on how partition expansion in Kafka impacts >> the >> > > clients. Iiuc, partition expansions are done manually in Kafka based >> on >> > the >> > > bytes-in rate of the partition. Do the existing kafka clients handle >> this >> > > expansion automatically? if yes, how does it work? If not, are there >> > plans >> > > to support it in the future? >> > > >> > > > Thus user's job should not need to bootstrap key/value store from >> the >> > > changelog topic. >> > > >> > > Why is this discussion relevant here? Key/value store / changelog >> topic >> > > partition is scoped with the context of a task. Since we are not >> changing >> > > the number of tasks, I don't think it is required to mention it here. >> > > >> > > > The new method takes the SystemStreamPartition-to-Task assignment >> from >> > > the previous job model which can be read from the coordinator stream. >> > > >> > > Jobmodel is currently not persisted to coordinator stream. In your >> > design, >> > > you talk about writing separate coordinator messages for ssp-to-task >> > > assignments. Hence, please correct this statement. It is kind of >> > misleading >> > > to the reader. >> > > >> > > My biggest gripe with this SEP is that it seems like a tailor-made >> > solution >> > > that relies on the semantics of the Kafka system and yet, we are >> trying >> > to >> > > masquerade that as operational requirements for other systems >> interacting >> > > with Samza. (Not to say that this is the first time such a choice is >> > being >> > > made in the Samza design). I am not seeing how this can a "general" >> > > solution for all input systems. That's my two cents. I would like to >> hear >> > > alternative points of view for this from other devs. >> > > >> > > Please make sure you have enough eyes on this SEP. If you do, please >> > start >> > > a VOTE thread to approve this SEP. >> > > >> > > Thanks! >> > > Navina >> > > >> > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com> >> wrote: >> > > >> > > > Hey Navina, >> > > > >> > > > I have updated the wiki based on your suggestion. More >> specifically, I >> > > have >> > > > made the following changes: >> > > > >> > > > - Improved Problem section and Motivation section to describe why we >> > use >> > > > the solution in this proposal instead of tackling the problem of >> task >> > > > expansion directly. >> > > > >> > > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka >> is >> > > only >> > > > used as example to illustrate why we want to expand partition >> expansion >> > > and >> > > > whether the operational requirement can be supported when Kafka is >> used >> > > as >> > > > the input system. Note that the proposed solution should work for >> any >> > > input >> > > > system that meets the operational requirement described in the wiki. >> > > > >> > > > - Fixed the problem in the figure. >> > > > >> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the >> > > wiki. >> > > > Together with GroupByPartitionFixedTaskNum, it should ensure that we >> > > have a >> > > > solution to enable partition expansion for all users that are using >> > > > pre-defined grouper in Samza. Note that those users who use custom >> > > grouper >> > > > would need to update their implementation. >> > > > >> > > > Can you let me know if this, together with the answers in the >> previous >> > > > email, addresses all your questions? Thanks for taking time to >> review >> > the >> > > > proposal. >> > > > >> > > > Regards, >> > > > Dong >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com> >> > wrote: >> > > > >> > > > > Hey Navina, >> > > > > >> > > > > Thanks much for your comments. Please see my reply inline. >> > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) < >> > > > > nav...@apache.org> wrote: >> > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of questions to >> understand >> > > > your >> > > > >> proposal better: >> > > > >> >> > > > >> * Under motivation, you mention that "_We expect this solution to >> > work >> > > > >> similarly with other input system as well._", yet I don't see any >> > > > >> discussion on how it will work with other input systems. That is, >> > what >> > > > >> kind >> > > > >> of contract does samza expect from other input systems ? If we >> are >> > not >> > > > >> planning to provide a generic solution, it might be worth >> calling it >> > > out >> > > > >> in >> > > > >> the SEP. >> > > > >> >> > > > > >> > > > > I think the contract we expect from other systems are exactly the >> > > > > operational requirement mentioned in the SEP, i.e. partitions >> should >> > > > always >> > > > > be doubled and the hash algorithm should module the number of >> > > partitions. >> > > > > SEP-5 should also allow partition expansion of all input systems >> that >> > > > meet >> > > > > these two requirements. I have updated the motivation section to >> > > clarify >> > > > > this. >> > > > > >> > > > > >> > > > >> >> > > > >> * I understand the partition mapping logic you have proposed. >> But I >> > > > think >> > > > >> the example explanation doesn't match the diagram. In the >> diagram, >> > > after >> > > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0 >> and >> > > > >> partition-3 and partition-4 are pointing to bucket 1. I think the >> > > former >> > > > >> has to be partition-0 and partition-2 and the latter, is >> partition-1 >> > > and >> > > > >> partition-3. If I am wrong, please help me understand the logic >> :) >> > > > >> >> > > > > >> > > > > Good catch. I will update the figure to fix this problem. >> > > > > >> > > > > >> > > > >> >> > > > >> * I don't know how partition expansion in Kafka works. I am >> familiar >> > > > with >> > > > >> how shard splitting happens in Kinesis - there is hierarchical >> > > relation >> > > > >> between the parent and child shards. This way, it will also allow >> > the >> > > > >> shards to be merged back. Iiuc, Kafka only supports partition >> > > > "expansion", >> > > > >> as opposed to "splits". Can you provide some context or link >> related >> > > to >> > > > >> how >> > > > >> partition expansion works in Kafka? >> > > > >> >> > > > > >> > > > > I couldn't find any wiki on partition expansion in Kafka. The >> > partition >> > > > > expansion logic in Kafka is very simply -- it simply adds new >> > partition >> > > > to >> > > > > the existing topic. Is there specific question you have regarding >> > > > partition >> > > > > expansion in Kafka? >> > > > > >> > > > > >> > > > >> >> > > > >> * Are you only recommending that expansion can be supported for >> > samza >> > > > jobs >> > > > >> that use Kafka as input systems **and** configure the SSPGrouper >> as >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies >> > for >> > > > >> GroupByPartition. Please correct me if I am wrong. What is the >> > > > expectation >> > > > >> for custom SSP Groupers? >> > > > >> >> > > > > >> > > > > The expansion can be supported for Samza jobs if the input system >> > meets >> > > > > the operational requirement mentioned above. It doesn't have to >> use >> > > Kafka >> > > > > as input system. >> > > > > >> > > > > The current proposal provided solution for jobs that currently use >> > > > > GroupByPartition. The proposal can be extended to support jobs >> that >> > use >> > > > > other grouper that are pre-defined in Samza. The custom SSP >> grouper >> > > needs >> > > > > to handle partition expansion similar to how >> > > GroupByPartitionFixedTaskNum >> > > > > handles it and it is users' responsibility to update their custom >> > > grouper >> > > > > implementation. >> > > > > >> > > > > >> > > > >> >> > > > >> * Regarding storing SSP-to-Task assignment to coordinator stream: >> > > Today, >> > > > >> the JobModel encapsulates the data model in samza which also >> > includes >> > > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList >> > > mapping. >> > > > >> What is the reason for using a separate coordinator stream >> message >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not >> > > persisted >> > > > in >> > > > >> the coordinator stream today? The reason locality exists >> outside of >> > > the >> > > > >> jobmodel is because *locality* information is written by each >> > > container, >> > > > >> where as it is consumed only by the leader jobcoordinator/AM. In >> > this >> > > > >> case, >> > > > >> the writer of the mapping information and the reader is still the >> > > leader >> > > > >> jobcoordinator/AM. So, I want to understand the motivation for >> this >> > > > >> choice. >> > > > >> >> > > > > >> > > > > Yes, the reason for using a separate coordinate stream message is >> > > because >> > > > > the task-to-sspList mapping is not currently persisted in the >> > > coordinator >> > > > > stream. We wouldn't need to create this new stream message if >> > JobModel >> > > is >> > > > > persisted. We need to persist the task-to-sspList mapping in the >> > > > > coordinator stream so that the job can derive the original number >> of >> > > > > partitions of each input stream regardless of how many times the >> > > > partition >> > > > > has expanded. Does this make sense? >> > > > > >> > > > > I am not sure how this is related to the locality though. Can you >> > > clarify >> > > > > your question if I haven't answered your question? >> > > > > >> > > > > Thanks! >> > > > > Dong >> > > > > >> > > > > >> > > > >> >> > > > >> Cheers! >> > > > >> Navina >> > > > >> >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com> >> > > wrote: >> > > > >> >> > > > >> > Hi all, >> > > > >> > >> > > > >> > We created SEP-5: Enable partition expansion of input streams. >> > > Please >> > > > >> find >> > > > >> > the SEP wiki in the link >> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP- >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams >> > > > >> > . >> > > > >> > >> > > > >> > You feedback is appreciated! >> > > > >> > >> > > > >> > Thanks, >> > > > >> > Dong >> > > > >> > >> > > > >> >> > > > > >> > > > > >> > > > >> > > >> > >> > >