Hey Yi, Navina,

I have updated the SEP-5 document based on our discussion. The difference
can be found here
<https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>.
Here is the summary of changes:

- Add new interface that extends the existing interface
SystemStreamPartitionGrouper. Newly-added grouper class should implement
this interface.
- Explained in the Rejected Alternative Section why we don't add new method
in the existing interface
- Explained in the Rejected Alternative Section why we don't config/class
for user to specify new-partition to old-partition mapping.

Can you take another look at the proposal and let me know if there is any
concern?

Cheers,
Dong


On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Yi,
>
> Thanks much for the comment. I have updated the doc to address all your
> comments except the one related to the interface. I am not sure I
> understand your suggestion of the new interface. Will discuss tomorrow.
>
> Thanks,
> Dong
>
> On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote:
>
>> Hi, Don,
>>
>> Thanks for the detailed design doc for a long-waited feature in Samza!
>> Really appreciate it! I did a quick pass and have the following comments:
>>
>> - minor: "limit the maximum size of partition" ==> "limit the maximum size
>> of each partition"
>> - "However, Samza currently is not able to handle partition expansion of
>> the input streams"==>better point out "for stateful jobs". For stateless
>> jobs, simply bouncing the job now can pick up the new partitions.
>> - "it is possible (e.g. with Kafka) that messages with a given key exists
>> in both partition 1 an 3. Because GroupByPartition will assign partition 1
>> and 3 to different tasks, messages with the same key may be handled by
>> different task/container/process and their state will be stored in
>> different changelog partition." The problem statement is not super clear
>> here. The issues with stateful jobs is: after GroupByPartition assign
>> partition 1 and 3 to different tasks, the new task handling partition 3
>> does not have the previous state to resume the work. e.g. a page-key based
>> counter would start from 0 in the new task for a specific key, instead of
>> resuming the previous count 50 held by task 1.
>> - minor rewording: "the first solution in this doc" ==> "the solution
>> proposed in this doc"
>> - "Thus additional development work is needed in Kafka to meet this
>> requirement" It would be good to link to a KIP if and when it exists
>> - Instead of touching/deprecating the interface
>> SystemStreamPartitionGrouper, I would recommend to have a different
>> implementation class of the interface, which in the constructor of the
>> grouper, takes two parameters: a) the previous task number read from the
>> coordinator stream; b) the configured new-partition to old-partition
>> mapping policy. Then, the grouper's interface method stays the same and
>> the
>> behavior of the grouper is more configurable which is good to support a
>> broader set of use cases in addition to Kafka's built-in partition
>> expansion policies.
>> - Minor renaming suggestion to the new grouper class names:
>> GroupByPartitionWithFixedTaskNum
>> and GroupBySystemStreamPartitionWithFixedTaskNum
>>
>> Thanks!
>>
>> - Yi
>>
>> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Navina,
>> >
>> > Thanks much for the comment. Please see my response below.
>> >
>> > Regarding your biggest gripe with the SEP, I personally think the
>> > operational requirement proposed in the KIP are pretty general and
>> could be
>> > easily enforced by other systems. The reason is that the module
>> operation
>> > is pretty standard and the default option when we choose partition. And
>> > usually the underlying system allows user to select arbitrary partition
>> > number if it supports partition expansion. Do you know any system that
>> does
>> > not meet these two requirement?
>> >
>> > Regarding your comment of the Motivation section, I renamed the first
>> > section as "Problem and Goal" and specified that "*The goal of this
>> > proposal is to enable partition expansion of the input streams*.". I
>> also
>> > put a sentence at the end of the Motivation section that "*The feature
>> of
>> > task expansion is out of the scope of this proposal and will be
>> addressed
>> > in a future SEP*". The second paragraph in the Motivation section is
>> mainly
>> > used to explain the thinking process that we have gone through, what
>> other
>> > alternative we have considered, and we plan to do in Samza in the nex
>> step.
>> >
>> > To answer your question why increasing the partition number will
>> increase
>> > the throughput of the kafka consumer in the container, Kafka consumer
>> can
>> > potentially fetch more data in one FetchResponse with more partitions in
>> > the FetchRequest. This is because we limit the maximum amount of data
>> that
>> > can be fetch for a given partition in the FetchResponse. This by
>> default is
>> > set to 1 MB. And there is reason that we can not arbitrarily bump up
>> this
>> > limit.
>> >
>> > To answer your question how partition expansion in Kafka impacts the
>> > clients, Kafka consumer is able to automatically detect new partition of
>> > the topic and reassign all (both old and new) partitions across
>> consumers
>> > in the consumer group IF you tell consumer the topic to be subscribed.
>> But
>> > consumer in Samza's container uses another way of subscription. Instead
>> of
>> > subscribing to the topic, the consumer in Samza's container subscribes
>> to
>> > the specific partitions of the topic. In this case, if new partitions
>> have
>> > been added, Samza will need to explicitly subscribe to the new
>> partitions
>> > of the topic. The "Handle partition expansion while tasks are running"
>> > section in the SEP addresses this issue in Samza -- it recalculates the
>> job
>> > model and restart container so that consumer can subscribe to the new
>> > partitions.
>> >
>> > I will ask other dev to take a look at the proposal. I will start the
>> > voting thread tomorrow if there is no further concern with the SEP.
>> >
>> > Thanks!
>> > Dong
>> >
>> >
>> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
>> > nav...@apache.org>
>> > wrote:
>> >
>> > > Hey Dong,
>> > >
>> > > >  I have updated the motivation section to clarify this.
>> > >
>> > > Thanks for updating the motivation. Couple of notes here:
>> > >
>> > > 1.
>> > > > "The motivation of increasing partition number of Kafka topic
>> includes
>> > 1)
>> > > limit the maximum size of a partition in order to improve broker
>> > > performance and 2) increase throughput of Kafka consumer in the Samza
>> > > container."
>> > >
>> > > It's unclear to me how increasing the partition number will increase
>> the
>> > > throughput of the kafka consumer in the container? Theoretically, you
>> > will
>> > > still be consuming the same amount of data in the container,
>> irrespective
>> > > of whether it is coming from one partition or more than one expanded
>> > > partitions. Can you please explain it for me here, what you mean by
>> that?
>> > >
>> > > 2. I believe the second paragraph under motivation is simply talking
>> > about
>> > > the scope of the current SEP. It will be easier to read if what
>> solution
>> > is
>> > > included in this SEP and what is left out as not in scope. (for
>> example,
>> > > expansions for stateful jobs is supported or not).
>> > >
>> > > > We need to persist the task-to-sspList mapping in the
>> > > coordinator stream so that the job can derive the original number of
>> > > partitions of each input stream regardless of how many times the
>> > partition
>> > > has expanded. Does this make sense?
>> > >
>> > > Yes. It does!
>> > >
>> > > > I am not sure how this is related to the locality though. Can you
>> > clarify
>> > > your question if I haven't answered your question?
>> > >
>> > > It's not related. I just meant to give an example of yet another
>> > > coordinator message that is persisted. Your ssp-to-task mapping is
>> > > following a similar pattern for persisting. Just wanted to clarify
>> that.
>> > >
>> > > > Can you let me know if this, together with the answers in the
>> previous
>> > > email, addresses all your questions?
>> > >
>> > > Yes. I believe you have addressed most of my questions. Thanks for
>> taking
>> > > time to do that.
>> > >
>> > > > Is there specific question you have regarding partition
>> > > expansion in Kafka?
>> > >
>> > > I guess my questions are on how partition expansion in Kafka impacts
>> the
>> > > clients. Iiuc, partition expansions are done manually in Kafka based
>> on
>> > the
>> > > bytes-in rate of the partition. Do the existing kafka clients handle
>> this
>> > > expansion automatically? if yes, how does it work? If not, are there
>> > plans
>> > > to support it in the future?
>> > >
>> > > > Thus user's job should not need to bootstrap key/value store from
>> the
>> > > changelog topic.
>> > >
>> > > Why is this discussion relevant here? Key/value store / changelog
>> topic
>> > > partition is scoped with the context of a task. Since we are not
>> changing
>> > > the number of tasks, I don't think it is required to mention it here.
>> > >
>> > > > The new method takes the SystemStreamPartition-to-Task assignment
>> from
>> > > the previous job model which can be read from the coordinator stream.
>> > >
>> > > Jobmodel is currently not persisted to coordinator stream. In your
>> > design,
>> > > you talk about writing separate coordinator messages for ssp-to-task
>> > > assignments. Hence, please correct this statement. It is kind of
>> > misleading
>> > > to the reader.
>> > >
>> > > My biggest gripe with this SEP is that it seems like a tailor-made
>> > solution
>> > > that relies on the semantics of the Kafka system and yet, we are
>> trying
>> > to
>> > > masquerade that as operational requirements for other systems
>> interacting
>> > > with Samza. (Not to say that this is the first time such a choice is
>> > being
>> > > made in the Samza design). I am not seeing how this can a "general"
>> > > solution for all input systems. That's my two cents. I would like to
>> hear
>> > > alternative points of view for this from other devs.
>> > >
>> > > Please make sure you have enough eyes on this SEP. If you do, please
>> > start
>> > > a VOTE thread to approve this SEP.
>> > >
>> > > Thanks!
>> > > Navina
>> > >
>> > >
>> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > >
>> > > > Hey Navina,
>> > > >
>> > > > I have updated the wiki based on your suggestion. More
>> specifically, I
>> > > have
>> > > > made the following changes:
>> > > >
>> > > > - Improved Problem section and Motivation section to describe why we
>> > use
>> > > > the solution in this proposal instead of tackling the problem of
>> task
>> > > > expansion directly.
>> > > >
>> > > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka
>> is
>> > > only
>> > > > used as example to illustrate why we want to expand partition
>> expansion
>> > > and
>> > > > whether the operational requirement can be supported when Kafka is
>> used
>> > > as
>> > > > the input system. Note that the proposed solution should work for
>> any
>> > > input
>> > > > system that meets the operational requirement described in the wiki.
>> > > >
>> > > > - Fixed the problem in the figure.
>> > > >
>> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the
>> > > wiki.
>> > > > Together with GroupByPartitionFixedTaskNum, it should ensure that we
>> > > have a
>> > > > solution to enable partition expansion for all users that are using
>> > > > pre-defined grouper in Samza. Note that those users who use custom
>> > > grouper
>> > > > would need to update their implementation.
>> > > >
>> > > > Can you let me know if this, together with the answers in the
>> previous
>> > > > email, addresses all your questions? Thanks for taking time to
>> review
>> > the
>> > > > proposal.
>> > > >
>> > > > Regards,
>> > > > Dong
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com>
>> > wrote:
>> > > >
>> > > > > Hey Navina,
>> > > > >
>> > > > > Thanks much for your comments. Please see my reply inline.
>> > > > >
>> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
>> > > > > nav...@apache.org> wrote:
>> > > > >
>> > > > >> Thanks for the SEP, Dong. I have a couple of questions to
>> understand
>> > > > your
>> > > > >> proposal better:
>> > > > >>
>> > > > >> * Under motivation, you mention that "_We expect this solution to
>> > work
>> > > > >> similarly with other input system as well._", yet I don't see any
>> > > > >> discussion on how it will work with other input systems. That is,
>> > what
>> > > > >> kind
>> > > > >> of contract does samza expect from other input systems ? If we
>> are
>> > not
>> > > > >> planning to provide a generic solution, it might be worth
>> calling it
>> > > out
>> > > > >> in
>> > > > >> the SEP.
>> > > > >>
>> > > > >
>> > > > > I think the contract we expect from other systems are exactly the
>> > > > > operational requirement mentioned in the SEP, i.e. partitions
>> should
>> > > > always
>> > > > > be doubled and the hash algorithm should module the number of
>> > > partitions.
>> > > > > SEP-5 should also allow partition expansion of all input systems
>> that
>> > > > meet
>> > > > > these two requirements. I have updated the motivation section to
>> > > clarify
>> > > > > this.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> * I understand the partition mapping logic you have proposed.
>> But I
>> > > > think
>> > > > >> the example explanation doesn't match the diagram. In the
>> diagram,
>> > > after
>> > > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0
>> and
>> > > > >> partition-3 and partition-4 are pointing to bucket 1. I think the
>> > > former
>> > > > >> has to be partition-0 and partition-2 and the latter, is
>> partition-1
>> > > and
>> > > > >> partition-3. If I am wrong, please help me understand the logic
>> :)
>> > > > >>
>> > > > >
>> > > > > Good catch. I will update the figure to fix this problem.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> * I don't know how partition expansion in Kafka works. I am
>> familiar
>> > > > with
>> > > > >> how shard splitting happens in Kinesis - there is hierarchical
>> > > relation
>> > > > >> between the parent and child shards. This way, it will also allow
>> > the
>> > > > >> shards to be merged back. Iiuc, Kafka only supports partition
>> > > > "expansion",
>> > > > >> as opposed to "splits". Can you provide some context or link
>> related
>> > > to
>> > > > >> how
>> > > > >> partition expansion works in Kafka?
>> > > > >>
>> > > > >
>> > > > > I couldn't find any wiki on partition expansion in Kafka. The
>> > partition
>> > > > > expansion logic in Kafka is very simply -- it simply adds new
>> > partition
>> > > > to
>> > > > > the existing topic. Is there specific question you have regarding
>> > > > partition
>> > > > > expansion in Kafka?
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> * Are you only recommending that expansion can be supported for
>> > samza
>> > > > jobs
>> > > > >> that use Kafka as input systems **and** configure the SSPGrouper
>> as
>> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies
>> > for
>> > > > >> GroupByPartition. Please correct me if I am wrong. What is the
>> > > > expectation
>> > > > >> for custom SSP Groupers?
>> > > > >>
>> > > > >
>> > > > > The expansion can be supported for Samza jobs if the input system
>> > meets
>> > > > > the operational requirement mentioned above. It doesn't have to
>> use
>> > > Kafka
>> > > > > as input system.
>> > > > >
>> > > > > The current proposal provided solution for jobs that currently use
>> > > > > GroupByPartition. The proposal can be extended to support jobs
>> that
>> > use
>> > > > > other grouper that are pre-defined in Samza. The custom SSP
>> grouper
>> > > needs
>> > > > > to handle partition expansion similar to how
>> > > GroupByPartitionFixedTaskNum
>> > > > > handles it and it is users' responsibility to update their custom
>> > > grouper
>> > > > > implementation.
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> * Regarding storing SSP-to-Task assignment to coordinator stream:
>> > > Today,
>> > > > >> the JobModel encapsulates the data model in samza which also
>> > includes
>> > > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList
>> > > mapping.
>> > > > >> What is the reason for using a separate coordinator stream
>> message
>> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not
>> > > persisted
>> > > > in
>> > > > >> the coordinator stream today?  The reason locality exists
>> outside of
>> > > the
>> > > > >> jobmodel is because *locality* information is written by each
>> > > container,
>> > > > >> where as it is consumed only by the leader jobcoordinator/AM. In
>> > this
>> > > > >> case,
>> > > > >> the writer of the mapping information and the reader is still the
>> > > leader
>> > > > >> jobcoordinator/AM. So, I want to understand the motivation for
>> this
>> > > > >> choice.
>> > > > >>
>> > > > >
>> > > > > Yes, the reason for using a separate coordinate stream message is
>> > > because
>> > > > > the task-to-sspList mapping is not currently persisted in the
>> > > coordinator
>> > > > > stream. We wouldn't need to create this new stream message if
>> > JobModel
>> > > is
>> > > > > persisted. We need to persist the task-to-sspList mapping in the
>> > > > > coordinator stream so that the job can derive the original number
>> of
>> > > > > partitions of each input stream regardless of how many times the
>> > > > partition
>> > > > > has expanded. Does this make sense?
>> > > > >
>> > > > > I am not sure how this is related to the locality though. Can you
>> > > clarify
>> > > > > your question if I haven't answered your question?
>> > > > >
>> > > > > Thanks!
>> > > > > Dong
>> > > > >
>> > > > >
>> > > > >>
>> > > > >> Cheers!
>> > > > >> Navina
>> > > > >>
>> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com>
>> > > wrote:
>> > > > >>
>> > > > >> > Hi all,
>> > > > >> >
>> > > > >> > We created SEP-5: Enable partition expansion of input streams.
>> > > Please
>> > > > >> find
>> > > > >> > the SEP wiki in the link
>> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
>> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
>> > > > >> > .
>> > > > >> >
>> > > > >> > You feedback is appreciated!
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> > Dong
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to