Hey Yi,

Thanks much for the comment. I have updated the doc to address all your
comments except the one related to the interface. I am not sure I
understand your suggestion of the new interface. Will discuss tomorrow.

Thanks,
Dong

On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Don,
>
> Thanks for the detailed design doc for a long-waited feature in Samza!
> Really appreciate it! I did a quick pass and have the following comments:
>
> - minor: "limit the maximum size of partition" ==> "limit the maximum size
> of each partition"
> - "However, Samza currently is not able to handle partition expansion of
> the input streams"==>better point out "for stateful jobs". For stateless
> jobs, simply bouncing the job now can pick up the new partitions.
> - "it is possible (e.g. with Kafka) that messages with a given key exists
> in both partition 1 an 3. Because GroupByPartition will assign partition 1
> and 3 to different tasks, messages with the same key may be handled by
> different task/container/process and their state will be stored in
> different changelog partition." The problem statement is not super clear
> here. The issues with stateful jobs is: after GroupByPartition assign
> partition 1 and 3 to different tasks, the new task handling partition 3
> does not have the previous state to resume the work. e.g. a page-key based
> counter would start from 0 in the new task for a specific key, instead of
> resuming the previous count 50 held by task 1.
> - minor rewording: "the first solution in this doc" ==> "the solution
> proposed in this doc"
> - "Thus additional development work is needed in Kafka to meet this
> requirement" It would be good to link to a KIP if and when it exists
> - Instead of touching/deprecating the interface
> SystemStreamPartitionGrouper, I would recommend to have a different
> implementation class of the interface, which in the constructor of the
> grouper, takes two parameters: a) the previous task number read from the
> coordinator stream; b) the configured new-partition to old-partition
> mapping policy. Then, the grouper's interface method stays the same and the
> behavior of the grouper is more configurable which is good to support a
> broader set of use cases in addition to Kafka's built-in partition
> expansion policies.
> - Minor renaming suggestion to the new grouper class names:
> GroupByPartitionWithFixedTaskNum
> and GroupBySystemStreamPartitionWithFixedTaskNum
>
> Thanks!
>
> - Yi
>
> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Navina,
> >
> > Thanks much for the comment. Please see my response below.
> >
> > Regarding your biggest gripe with the SEP, I personally think the
> > operational requirement proposed in the KIP are pretty general and could
> be
> > easily enforced by other systems. The reason is that the module operation
> > is pretty standard and the default option when we choose partition. And
> > usually the underlying system allows user to select arbitrary partition
> > number if it supports partition expansion. Do you know any system that
> does
> > not meet these two requirement?
> >
> > Regarding your comment of the Motivation section, I renamed the first
> > section as "Problem and Goal" and specified that "*The goal of this
> > proposal is to enable partition expansion of the input streams*.". I also
> > put a sentence at the end of the Motivation section that "*The feature of
> > task expansion is out of the scope of this proposal and will be addressed
> > in a future SEP*". The second paragraph in the Motivation section is
> mainly
> > used to explain the thinking process that we have gone through, what
> other
> > alternative we have considered, and we plan to do in Samza in the nex
> step.
> >
> > To answer your question why increasing the partition number will increase
> > the throughput of the kafka consumer in the container, Kafka consumer can
> > potentially fetch more data in one FetchResponse with more partitions in
> > the FetchRequest. This is because we limit the maximum amount of data
> that
> > can be fetch for a given partition in the FetchResponse. This by default
> is
> > set to 1 MB. And there is reason that we can not arbitrarily bump up this
> > limit.
> >
> > To answer your question how partition expansion in Kafka impacts the
> > clients, Kafka consumer is able to automatically detect new partition of
> > the topic and reassign all (both old and new) partitions across consumers
> > in the consumer group IF you tell consumer the topic to be subscribed.
> But
> > consumer in Samza's container uses another way of subscription. Instead
> of
> > subscribing to the topic, the consumer in Samza's container subscribes to
> > the specific partitions of the topic. In this case, if new partitions
> have
> > been added, Samza will need to explicitly subscribe to the new partitions
> > of the topic. The "Handle partition expansion while tasks are running"
> > section in the SEP addresses this issue in Samza -- it recalculates the
> job
> > model and restart container so that consumer can subscribe to the new
> > partitions.
> >
> > I will ask other dev to take a look at the proposal. I will start the
> > voting thread tomorrow if there is no further concern with the SEP.
> >
> > Thanks!
> > Dong
> >
> >
> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> > nav...@apache.org>
> > wrote:
> >
> > > Hey Dong,
> > >
> > > >  I have updated the motivation section to clarify this.
> > >
> > > Thanks for updating the motivation. Couple of notes here:
> > >
> > > 1.
> > > > "The motivation of increasing partition number of Kafka topic
> includes
> > 1)
> > > limit the maximum size of a partition in order to improve broker
> > > performance and 2) increase throughput of Kafka consumer in the Samza
> > > container."
> > >
> > > It's unclear to me how increasing the partition number will increase
> the
> > > throughput of the kafka consumer in the container? Theoretically, you
> > will
> > > still be consuming the same amount of data in the container,
> irrespective
> > > of whether it is coming from one partition or more than one expanded
> > > partitions. Can you please explain it for me here, what you mean by
> that?
> > >
> > > 2. I believe the second paragraph under motivation is simply talking
> > about
> > > the scope of the current SEP. It will be easier to read if what
> solution
> > is
> > > included in this SEP and what is left out as not in scope. (for
> example,
> > > expansions for stateful jobs is supported or not).
> > >
> > > > We need to persist the task-to-sspList mapping in the
> > > coordinator stream so that the job can derive the original number of
> > > partitions of each input stream regardless of how many times the
> > partition
> > > has expanded. Does this make sense?
> > >
> > > Yes. It does!
> > >
> > > > I am not sure how this is related to the locality though. Can you
> > clarify
> > > your question if I haven't answered your question?
> > >
> > > It's not related. I just meant to give an example of yet another
> > > coordinator message that is persisted. Your ssp-to-task mapping is
> > > following a similar pattern for persisting. Just wanted to clarify
> that.
> > >
> > > > Can you let me know if this, together with the answers in the
> previous
> > > email, addresses all your questions?
> > >
> > > Yes. I believe you have addressed most of my questions. Thanks for
> taking
> > > time to do that.
> > >
> > > > Is there specific question you have regarding partition
> > > expansion in Kafka?
> > >
> > > I guess my questions are on how partition expansion in Kafka impacts
> the
> > > clients. Iiuc, partition expansions are done manually in Kafka based on
> > the
> > > bytes-in rate of the partition. Do the existing kafka clients handle
> this
> > > expansion automatically? if yes, how does it work? If not, are there
> > plans
> > > to support it in the future?
> > >
> > > > Thus user's job should not need to bootstrap key/value store from the
> > > changelog topic.
> > >
> > > Why is this discussion relevant here? Key/value store / changelog topic
> > > partition is scoped with the context of a task. Since we are not
> changing
> > > the number of tasks, I don't think it is required to mention it here.
> > >
> > > > The new method takes the SystemStreamPartition-to-Task assignment
> from
> > > the previous job model which can be read from the coordinator stream.
> > >
> > > Jobmodel is currently not persisted to coordinator stream. In your
> > design,
> > > you talk about writing separate coordinator messages for ssp-to-task
> > > assignments. Hence, please correct this statement. It is kind of
> > misleading
> > > to the reader.
> > >
> > > My biggest gripe with this SEP is that it seems like a tailor-made
> > solution
> > > that relies on the semantics of the Kafka system and yet, we are trying
> > to
> > > masquerade that as operational requirements for other systems
> interacting
> > > with Samza. (Not to say that this is the first time such a choice is
> > being
> > > made in the Samza design). I am not seeing how this can a "general"
> > > solution for all input systems. That's my two cents. I would like to
> hear
> > > alternative points of view for this from other devs.
> > >
> > > Please make sure you have enough eyes on this SEP. If you do, please
> > start
> > > a VOTE thread to approve this SEP.
> > >
> > > Thanks!
> > > Navina
> > >
> > >
> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > >
> > > > Hey Navina,
> > > >
> > > > I have updated the wiki based on your suggestion. More specifically,
> I
> > > have
> > > > made the following changes:
> > > >
> > > > - Improved Problem section and Motivation section to describe why we
> > use
> > > > the solution in this proposal instead of tackling the problem of task
> > > > expansion directly.
> > > >
> > > > - Illustrate the design in a way that doesn't bind to Kafka. Kafka is
> > > only
> > > > used as example to illustrate why we want to expand partition
> expansion
> > > and
> > > > whether the operational requirement can be supported when Kafka is
> used
> > > as
> > > > the input system. Note that the proposed solution should work for any
> > > input
> > > > system that meets the operational requirement described in the wiki.
> > > >
> > > > - Fixed the problem in the figure.
> > > >
> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum to the
> > > wiki.
> > > > Together with GroupByPartitionFixedTaskNum, it should ensure that we
> > > have a
> > > > solution to enable partition expansion for all users that are using
> > > > pre-defined grouper in Samza. Note that those users who use custom
> > > grouper
> > > > would need to update their implementation.
> > > >
> > > > Can you let me know if this, together with the answers in the
> previous
> > > > email, addresses all your questions? Thanks for taking time to review
> > the
> > > > proposal.
> > > >
> > > > Regards,
> > > > Dong
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >
> > > > > Hey Navina,
> > > > >
> > > > > Thanks much for your comments. Please see my reply inline.
> > > > >
> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
> > > > > nav...@apache.org> wrote:
> > > > >
> > > > >> Thanks for the SEP, Dong. I have a couple of questions to
> understand
> > > > your
> > > > >> proposal better:
> > > > >>
> > > > >> * Under motivation, you mention that "_We expect this solution to
> > work
> > > > >> similarly with other input system as well._", yet I don't see any
> > > > >> discussion on how it will work with other input systems. That is,
> > what
> > > > >> kind
> > > > >> of contract does samza expect from other input systems ? If we are
> > not
> > > > >> planning to provide a generic solution, it might be worth calling
> it
> > > out
> > > > >> in
> > > > >> the SEP.
> > > > >>
> > > > >
> > > > > I think the contract we expect from other systems are exactly the
> > > > > operational requirement mentioned in the SEP, i.e. partitions
> should
> > > > always
> > > > > be doubled and the hash algorithm should module the number of
> > > partitions.
> > > > > SEP-5 should also allow partition expansion of all input systems
> that
> > > > meet
> > > > > these two requirements. I have updated the motivation section to
> > > clarify
> > > > > this.
> > > > >
> > > > >
> > > > >>
> > > > >> * I understand the partition mapping logic you have proposed. But
> I
> > > > think
> > > > >> the example explanation doesn't match the diagram. In the diagram,
> > > after
> > > > >> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
> > > > >> partition-3 and partition-4 are pointing to bucket 1. I think the
> > > former
> > > > >> has to be partition-0 and partition-2 and the latter, is
> partition-1
> > > and
> > > > >> partition-3. If I am wrong, please help me understand the logic :)
> > > > >>
> > > > >
> > > > > Good catch. I will update the figure to fix this problem.
> > > > >
> > > > >
> > > > >>
> > > > >> * I don't know how partition expansion in Kafka works. I am
> familiar
> > > > with
> > > > >> how shard splitting happens in Kinesis - there is hierarchical
> > > relation
> > > > >> between the parent and child shards. This way, it will also allow
> > the
> > > > >> shards to be merged back. Iiuc, Kafka only supports partition
> > > > "expansion",
> > > > >> as opposed to "splits". Can you provide some context or link
> related
> > > to
> > > > >> how
> > > > >> partition expansion works in Kafka?
> > > > >>
> > > > >
> > > > > I couldn't find any wiki on partition expansion in Kafka. The
> > partition
> > > > > expansion logic in Kafka is very simply -- it simply adds new
> > partition
> > > > to
> > > > > the existing topic. Is there specific question you have regarding
> > > > partition
> > > > > expansion in Kafka?
> > > > >
> > > > >
> > > > >>
> > > > >> * Are you only recommending that expansion can be supported for
> > samza
> > > > jobs
> > > > >> that use Kafka as input systems **and** configure the SSPGrouper
> as
> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only applies
> > for
> > > > >> GroupByPartition. Please correct me if I am wrong. What is the
> > > > expectation
> > > > >> for custom SSP Groupers?
> > > > >>
> > > > >
> > > > > The expansion can be supported for Samza jobs if the input system
> > meets
> > > > > the operational requirement mentioned above. It doesn't have to use
> > > Kafka
> > > > > as input system.
> > > > >
> > > > > The current proposal provided solution for jobs that currently use
> > > > > GroupByPartition. The proposal can be extended to support jobs that
> > use
> > > > > other grouper that are pre-defined in Samza. The custom SSP grouper
> > > needs
> > > > > to handle partition expansion similar to how
> > > GroupByPartitionFixedTaskNum
> > > > > handles it and it is users' responsibility to update their custom
> > > grouper
> > > > > implementation.
> > > > >
> > > > >
> > > > >>
> > > > >> * Regarding storing SSP-to-Task assignment to coordinator stream:
> > > Today,
> > > > >> the JobModel encapsulates the data model in samza which also
> > includes
> > > > >> **TaskModels**. TaskModel, typically shows the task-to-sspList
> > > mapping.
> > > > >> What is the reason for using a separate coordinator stream message
> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is not
> > > persisted
> > > > in
> > > > >> the coordinator stream today?  The reason locality exists outside
> of
> > > the
> > > > >> jobmodel is because *locality* information is written by each
> > > container,
> > > > >> where as it is consumed only by the leader jobcoordinator/AM. In
> > this
> > > > >> case,
> > > > >> the writer of the mapping information and the reader is still the
> > > leader
> > > > >> jobcoordinator/AM. So, I want to understand the motivation for
> this
> > > > >> choice.
> > > > >>
> > > > >
> > > > > Yes, the reason for using a separate coordinate stream message is
> > > because
> > > > > the task-to-sspList mapping is not currently persisted in the
> > > coordinator
> > > > > stream. We wouldn't need to create this new stream message if
> > JobModel
> > > is
> > > > > persisted. We need to persist the task-to-sspList mapping in the
> > > > > coordinator stream so that the job can derive the original number
> of
> > > > > partitions of each input stream regardless of how many times the
> > > > partition
> > > > > has expanded. Does this make sense?
> > > > >
> > > > > I am not sure how this is related to the locality though. Can you
> > > clarify
> > > > > your question if I haven't answered your question?
> > > > >
> > > > > Thanks!
> > > > > Dong
> > > > >
> > > > >
> > > > >>
> > > > >> Cheers!
> > > > >> Navina
> > > > >>
> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > >>
> > > > >> > Hi all,
> > > > >> >
> > > > >> > We created SEP-5: Enable partition expansion of input streams.
> > > Please
> > > > >> find
> > > > >> > the SEP wiki in the link
> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > > >> > .
> > > > >> >
> > > > >> > You feedback is appreciated!
> > > > >> >
> > > > >> > Thanks,
> > > > >> > Dong
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to