Hey Yi, Thanks much for the comment. I have updated the doc to address all your comments except the one related to the interface. I am not sure I understand your suggestion of the new interface. Will discuss tomorrow.
Thanks, Dong On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Don, > > Thanks for the detailed design doc for a long-waited feature in Samza! > Really appreciate it! I did a quick pass and have the following comments: > > - minor: "limit the maximum size of partition" ==> "limit the maximum size > of each partition" > - "However, Samza currently is not able to handle partition expansion of > the input streams"==>better point out "for stateful jobs". For stateless > jobs, simply bouncing the job now can pick up the new partitions. > - "it is possible (e.g. with Kafka) that messages with a given key exists > in both partition 1 an 3. Because GroupByPartition will assign partition 1 > and 3 to different tasks, messages with the same key may be handled by > different task/container/process and their state will be stored in > different changelog partition." The problem statement is not super clear > here. The issues with stateful jobs is: after GroupByPartition assign > partition 1 and 3 to different tasks, the new task handling partition 3 > does not have the previous state to resume the work. e.g. a page-key based > counter would start from 0 in the new task for a specific key, instead of > resuming the previous count 50 held by task 1. > - minor rewording: "the first solution in this doc" ==> "the solution > proposed in this doc" > - "Thus additional development work is needed in Kafka to meet this > requirement" It would be good to link to a KIP if and when it exists > - Instead of touching/deprecating the interface > SystemStreamPartitionGrouper, I would recommend to have a different > implementation class of the interface, which in the constructor of the > grouper, takes two parameters: a) the previous task number read from the > coordinator stream; b) the configured new-partition to old-partition > mapping policy. Then, the grouper's interface method stays the same and the > behavior of the grouper is more configurable which is good to support a > broader set of use cases in addition to Kafka's built-in partition > expansion policies. > - Minor renaming suggestion to the new grouper class names: > GroupByPartitionWithFixedTaskNum > and GroupBySystemStreamPartitionWithFixedTaskNum > > Thanks! > > - Yi > > On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote: > > > Hey Navina, > > > > Thanks much for the comment. Please see my response below. > > > > Regarding your biggest gripe with the SEP, I personally think the > > operational requirement proposed in the KIP are pretty general and could > be > > easily enforced by other systems. The reason is that the module operation > > is pretty standard and the default option when we choose partition. And > > usually the underlying system allows user to select arbitrary partition > > number if it supports partition expansion. Do you know any system that > does > > not meet these two requirement? > > > > Regarding your comment of the Motivation section, I renamed the first > > section as "Problem and Goal" and specified that "*The goal of this > > proposal is to enable partition expansion of the input streams*.". I also > > put a sentence at the end of the Motivation section that "*The feature of > > task expansion is out of the scope of this proposal and will be addressed > > in a future SEP*". The second paragraph in the Motivation section is > mainly > > used to explain the thinking process that we have gone through, what > other > > alternative we have considered, and we plan to do in Samza in the nex > step. > > > > To answer your question why increasing the partition number will increase > > the throughput of the kafka consumer in the container, Kafka consumer can > > potentially fetch more data in one FetchResponse with more partitions in > > the FetchRequest. This is because we limit the maximum amount of data > that > > can be fetch for a given partition in the FetchResponse. This by default > is > > set to 1 MB. And there is reason that we can not arbitrarily bump up this > > limit. > > > > To answer your question how partition expansion in Kafka impacts the > > clients, Kafka consumer is able to automatically detect new partition of > > the topic and reassign all (both old and new) partitions across consumers > > in the consumer group IF you tell consumer the topic to be subscribed. > But > > consumer in Samza's container uses another way of subscription. Instead > of > > subscribing to the topic, the consumer in Samza's container subscribes to > > the specific partitions of the topic. In this case, if new partitions > have > > been added, Samza will need to explicitly subscribe to the new partitions > > of the topic. The "Handle partition expansion while tasks are running" > > section in the SEP addresses this issue in Samza -- it recalculates the > job > > model and restart container so that consumer can subscribe to the new > > partitions. > > > > I will ask other dev to take a look at the proposal. I will start the > > voting thread tomorrow if there is no further concern with the SEP. > > > > Thanks! > > Dong > > > > > > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) < > > nav...@apache.org> > > wrote: > > > > > Hey Dong, > > > > > > > I have updated the motivation section to clarify this. > > > > > > Thanks for updating the motivation. Couple of notes here: > > > > > > 1. > > > > "The motivation of increasing partition number of Kafka topic > includes > > 1) > > > limit the maximum size of a partition in order to improve broker > > > performance and 2) increase throughput of Kafka consumer in the Samza > > > container." > > > > > > It's unclear to me how increasing the partition number will increase > the > > > throughput of the kafka consumer in the container? Theoretically, you > > will > > > still be consuming the same amount of data in the container, > irrespective > > > of whether it is coming from one partition or more than one expanded > > > partitions. Can you please explain it for me here, what you mean by > that? > > > > > > 2. I believe the second paragraph under motivation is simply talking > > about > > > the scope of the current SEP. It will be easier to read if what > solution > > is > > > included in this SEP and what is left out as not in scope. (for > example, > > > expansions for stateful jobs is supported or not). > > > > > > > We need to persist the task-to-sspList mapping in the > > > coordinator stream so that the job can derive the original number of > > > partitions of each input stream regardless of how many times the > > partition > > > has expanded. Does this make sense? > > > > > > Yes. It does! > > > > > > > I am not sure how this is related to the locality though. Can you > > clarify > > > your question if I haven't answered your question? > > > > > > It's not related. I just meant to give an example of yet another > > > coordinator message that is persisted. Your ssp-to-task mapping is > > > following a similar pattern for persisting. Just wanted to clarify > that. > > > > > > > Can you let me know if this, together with the answers in the > previous > > > email, addresses all your questions? > > > > > > Yes. I believe you have addressed most of my questions. Thanks for > taking > > > time to do that. > > > > > > > Is there specific question you have regarding partition > > > expansion in Kafka? > > > > > > I guess my questions are on how partition expansion in Kafka impacts > the > > > clients. Iiuc, partition expansions are done manually in Kafka based on > > the > > > bytes-in rate of the partition. Do the existing kafka clients handle > this > > > expansion automatically? if yes, how does it work? If not, are there > > plans > > > to support it in the future? > > > > > > > Thus user's job should not need to bootstrap key/value store from the > > > changelog topic. > > > > > > Why is this discussion relevant here? Key/value store / changelog topic > > > partition is scoped with the context of a task. Since we are not > changing > > > the number of tasks, I don't think it is required to mention it here. > > > > > > > The new method takes the SystemStreamPartition-to-Task assignment > from > > > the previous job model which can be read from the coordinator stream. > > > > > > Jobmodel is currently not persisted to coordinator stream. In your > > design, > > > you talk about writing separate coordinator messages for ssp-to-task > > > assignments. Hence, please correct this statement. It is kind of > > misleading > > > to the reader. > > > > > > My biggest gripe with this SEP is that it seems like a tailor-made > > solution > > > that relies on the semantics of the Kafka system and yet, we are trying > > to > > > masquerade that as operational requirements for other systems > interacting > > > with Samza. (Not to say that this is the first time such a choice is > > being > > > made in the Samza design). I am not seeing how this can a "general" > > > solution for all input systems. That's my two cents. I would like to > hear > > > alternative points of view for this from other devs. > > > > > > Please make sure you have enough eyes on this SEP. If you do, please > > start > > > a VOTE thread to approve this SEP. > > > > > > Thanks! > > > Navina > > > > > > > > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com> > wrote: > > > > > > > Hey Navina, > > > > > > > > I have updated the wiki based on your suggestion. More specifically, > I > > > have > > > > made the following changes: > > > > > > > > - Improved Problem section and Motivation section to describe why we > > use > > > > the solution in this proposal instead of tackling the problem of task > > > > expansion directly. > > > > > > > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka is > > > only > > > > used as example to illustrate why we want to expand partition > expansion > > > and > > > > whether the operational requirement can be supported when Kafka is > used > > > as > > > > the input system. Note that the proposed solution should work for any > > > input > > > > system that meets the operational requirement described in the wiki. > > > > > > > > - Fixed the problem in the figure. > > > > > > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the > > > wiki. > > > > Together with GroupByPartitionFixedTaskNum, it should ensure that we > > > have a > > > > solution to enable partition expansion for all users that are using > > > > pre-defined grouper in Samza. Note that those users who use custom > > > grouper > > > > would need to update their implementation. > > > > > > > > Can you let me know if this, together with the answers in the > previous > > > > email, addresses all your questions? Thanks for taking time to review > > the > > > > proposal. > > > > > > > > Regards, > > > > Dong > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com> > > wrote: > > > > > > > > > Hey Navina, > > > > > > > > > > Thanks much for your comments. Please see my reply inline. > > > > > > > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) < > > > > > nav...@apache.org> wrote: > > > > > > > > > >> Thanks for the SEP, Dong. I have a couple of questions to > understand > > > > your > > > > >> proposal better: > > > > >> > > > > >> * Under motivation, you mention that "_We expect this solution to > > work > > > > >> similarly with other input system as well._", yet I don't see any > > > > >> discussion on how it will work with other input systems. That is, > > what > > > > >> kind > > > > >> of contract does samza expect from other input systems ? If we are > > not > > > > >> planning to provide a generic solution, it might be worth calling > it > > > out > > > > >> in > > > > >> the SEP. > > > > >> > > > > > > > > > > I think the contract we expect from other systems are exactly the > > > > > operational requirement mentioned in the SEP, i.e. partitions > should > > > > always > > > > > be doubled and the hash algorithm should module the number of > > > partitions. > > > > > SEP-5 should also allow partition expansion of all input systems > that > > > > meet > > > > > these two requirements. I have updated the motivation section to > > > clarify > > > > > this. > > > > > > > > > > > > > > >> > > > > >> * I understand the partition mapping logic you have proposed. But > I > > > > think > > > > >> the example explanation doesn't match the diagram. In the diagram, > > > after > > > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0 and > > > > >> partition-3 and partition-4 are pointing to bucket 1. I think the > > > former > > > > >> has to be partition-0 and partition-2 and the latter, is > partition-1 > > > and > > > > >> partition-3. If I am wrong, please help me understand the logic :) > > > > >> > > > > > > > > > > Good catch. I will update the figure to fix this problem. > > > > > > > > > > > > > > >> > > > > >> * I don't know how partition expansion in Kafka works. I am > familiar > > > > with > > > > >> how shard splitting happens in Kinesis - there is hierarchical > > > relation > > > > >> between the parent and child shards. This way, it will also allow > > the > > > > >> shards to be merged back. Iiuc, Kafka only supports partition > > > > "expansion", > > > > >> as opposed to "splits". Can you provide some context or link > related > > > to > > > > >> how > > > > >> partition expansion works in Kafka? > > > > >> > > > > > > > > > > I couldn't find any wiki on partition expansion in Kafka. The > > partition > > > > > expansion logic in Kafka is very simply -- it simply adds new > > partition > > > > to > > > > > the existing topic. Is there specific question you have regarding > > > > partition > > > > > expansion in Kafka? > > > > > > > > > > > > > > >> > > > > >> * Are you only recommending that expansion can be supported for > > samza > > > > jobs > > > > >> that use Kafka as input systems **and** configure the SSPGrouper > as > > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies > > for > > > > >> GroupByPartition. Please correct me if I am wrong. What is the > > > > expectation > > > > >> for custom SSP Groupers? > > > > >> > > > > > > > > > > The expansion can be supported for Samza jobs if the input system > > meets > > > > > the operational requirement mentioned above. It doesn't have to use > > > Kafka > > > > > as input system. > > > > > > > > > > The current proposal provided solution for jobs that currently use > > > > > GroupByPartition. The proposal can be extended to support jobs that > > use > > > > > other grouper that are pre-defined in Samza. The custom SSP grouper > > > needs > > > > > to handle partition expansion similar to how > > > GroupByPartitionFixedTaskNum > > > > > handles it and it is users' responsibility to update their custom > > > grouper > > > > > implementation. > > > > > > > > > > > > > > >> > > > > >> * Regarding storing SSP-to-Task assignment to coordinator stream: > > > Today, > > > > >> the JobModel encapsulates the data model in samza which also > > includes > > > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList > > > mapping. > > > > >> What is the reason for using a separate coordinator stream message > > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not > > > persisted > > > > in > > > > >> the coordinator stream today? The reason locality exists outside > of > > > the > > > > >> jobmodel is because *locality* information is written by each > > > container, > > > > >> where as it is consumed only by the leader jobcoordinator/AM. In > > this > > > > >> case, > > > > >> the writer of the mapping information and the reader is still the > > > leader > > > > >> jobcoordinator/AM. So, I want to understand the motivation for > this > > > > >> choice. > > > > >> > > > > > > > > > > Yes, the reason for using a separate coordinate stream message is > > > because > > > > > the task-to-sspList mapping is not currently persisted in the > > > coordinator > > > > > stream. We wouldn't need to create this new stream message if > > JobModel > > > is > > > > > persisted. We need to persist the task-to-sspList mapping in the > > > > > coordinator stream so that the job can derive the original number > of > > > > > partitions of each input stream regardless of how many times the > > > > partition > > > > > has expanded. Does this make sense? > > > > > > > > > > I am not sure how this is related to the locality though. Can you > > > clarify > > > > > your question if I haven't answered your question? > > > > > > > > > > Thanks! > > > > > Dong > > > > > > > > > > > > > > >> > > > > >> Cheers! > > > > >> Navina > > > > >> > > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com> > > > wrote: > > > > >> > > > > >> > Hi all, > > > > >> > > > > > >> > We created SEP-5: Enable partition expansion of input streams. > > > Please > > > > >> find > > > > >> > the SEP wiki in the link > > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP- > > > > >> > 5%3A+Enable+partition+expansion+of+input+streams > > > > >> > . > > > > >> > > > > > >> > You feedback is appreciated! > > > > >> > > > > > >> > Thanks, > > > > >> > Dong > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >