Hey Jacob, Navina, Yi,

I am wondering if my answer has addressed your concern. Can you let me know
if there is any concern with SEP?

Thanks,
Dong

On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jacob,
>
> Thanks for taking time to review the SEP.
>
> I agree with you and Navina that the current SEP doesn't provide support
> to arbitrary input systems and it doesn't support partition shrink. I think
> the scope of this SEP is to support partition expansion for Kafka (the most
> widely used input system of Samza) and keep the door open for partition
> support of various input systems. The current design can support any system
> that meets the two operational requirement specified in the doc.
>
> While it is possible to support more types of input systems, it will
> likely add more complexity to the design. For example, the first
> alternative solution from you requires broker-side support to negotiate
> hash algorithm. The second alternative solution requires changelog
> partition reshuffle which carries its own design complexity and performance
> overhead. There is tradeoff between the generality and the complexity among
> these choices. I like the current design because it is simple and addresses
> a big usage scenario for us. We can add more complexity to generalize the
> design if it enables important use-case. Does this sound reasonable?
>
> Note that the "Rejected Alternative" section also mentions the possibility
> of supporting a wider range of input systems by allowing user to specify
> the new-partition to old-partition mapping. We are not doing it because 1)
> we may have better understanding of the design after we have a specific
> second input system to support 2) the current design can be extended to
> support general input systems. I think similar argument can be applied
> explain why we don't have to support general input systems using the
> potentially-good alternatives you mentioned.
>
> I hope SEP-5 can be an important first-step towards supporting partition
> expansion of any input system.
>
> To answer your questions about the current proposal:
>
> >1. "An alternative solution is to allow task number to increase after
> >partition expansion and uses a proper task-to-container assignment to
> make
> >sure the Samza output is correct." What does the container have to do
> with
> >stateful processing or output in general?
>
> The task-to-container assignment matters because if the correlated tasks
> (i.e. tasks that consume messages with the same key) needs to be in the
> same container so that they can share the same key/value local store on the
> same physical machine.
>
> >2. When you use "Join" as an example, you basically mean multiple
> >co-partitioned streams, right? This is opposed to multiple,
> >independently-partitioned streams or a single stream. Would be nice to
> >formulate the proposal in these more general terms.
>
> I thought "join" is a commonly used to refer to the join opeartion with
> co-partitioned stream but I may be wrong. I have updated the wiki to
> explicitly mention "co-partitioned stream". Does this look better now?
>
> >3. When switching SSP groupers, how will the users avoid the
> >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
> GrouperFactoryValues
> >exception?
>
> I think we can hardcode new logic in KafkaCheckpointLogKey.scala such that
> exception will not be thrown if new grouper is
> GroupByPartitionWithFixedTaskNum and old grouper is GroupByPartition.
> Does this look reasonable?
>
> >4. Partition to task assignment is meaningless without key to partition
> >mapping. The real semantics are captured in the external requirement for
> >partitioning via hash+modulo. But in that case, iiuc, only the partition
> >count matters. So why not just store the original partition count rather
> >than the whole mapping?
>
> I think storing the previous task-to-partition mapping is more general
> than storing the partition count of all topics for the following reasons:
>
> - Samza already stores the task-to-container mapping and container-to-host
> mapping in the coordinator stream. It seems consistent to also store the
> partition-to-task mapping. And this information may be useful for other
> use-case such as debugging.
>
> - By having the new interface take the previous task-to-partition
> assignment instead of a topic-to-partition-count mapping as new parameter,
> we can potentially have grouper implementation to support other types of
> input systems.
>
> - It is sightly simpler to store the task-to-partition assignment because
> we don't need to know whether this is the first time a job is started or
> not. On the other hand, you can write topic-to-partition-count mapping to
> the coordinator stream only if this is the first time the job is run
>
> Thanks,
> Dong
>
> On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
>> Hey Dong,
>>
>> Thanks for the SEP. Supporting partition changes is critically important
>> for stateful Samza jobs, so it's great to see some ideas on that front!
>>
>> Sorry for the late feedback, but I have a few thoughts to contribute.
>>
>> Big +1 on Navina's comment:
>>
>> > My biggest gripe with this SEP is that it seems like a tailor-made
>> > solution
>> > that relies on the semantics of the Kafka system and yet, we are trying
>> to
>> > masquerade that as operational requirements for other systems
>> interacting
>> > with Samza. (Not to say that this is the first time such a choice is
>> being
>> > made in the Samza design). I am not seeing how this can a "general"
>> > solution for all input systems. That's my two cents. I would like to
>> hear
>> > alternative points of view for this from other devs.
>>
>>
>> Two examples of this:
>> 1. This is mostly a hypothetical, but some message brokers may use key
>> range assignment rather than hash+modulo.
>> 2. Kafka can't reduce the number of partitions, but it can happen on other
>> systems. For example, it may be cheaper to reduce the number of partitions
>> on a hosted service where the cost model depends on the number of
>> partitions/shards.
>>
>> It seems to me that a solution which doesn't depend on partition key
>> assignment in the message broker. Here are a few alternatives that weren't
>> discussed and I think should be considered:
>>
>> Alternatives in order of increasing preference:
>> 1. Samza manages the partition hash (via some new contract with the
>> brokers) and guarantees correct routing of keys among the new partitions.
>> 2. Samza detects a task count change, creates a new changelog with correct
>> partitions, and *somehow* reshuffles all existing changelog data into the
>> new topic and then uses the new topic from then on. (doesn't work without
>> changelog, but in that case durability isn't paramount, so we can just
>> wipe)
>> 3. Use RPC in between stages and samza fully manages key assignment among
>> tasks. No on-disk topic data to clean up. Mandatory repartitioning in the
>> first stage to pre-scaled tasks in next stage.
>> 4. Combined 2-3 solution
>>
>> Finally, some questions about the current proposal:
>> 1. "An alternative solution is to allow task number to increase after
>> partition expansion and uses a proper task-to-container assignment to make
>> sure the Samza output is correct." What does the container have to do with
>> stateful processing or output in general?
>> 2. When you use "Join" as an example, you basically mean multiple
>> co-partitioned streams, right? This is opposed to multiple,
>> independently-partitioned streams or a single stream. Would be nice to
>> formulate the proposal in these more general terms.
>> 3. When switching SSP groupers, how will the users avoid the
>> org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti
>> tionGrouperFactoryValues
>> exception?
>> 4. Partition to task assignment is meaningless without key to partition
>> mapping. The real semantics are captured in the external requirement for
>> partitioning via hash+modulo. But in that case, iiuc, only the partition
>> count matters. So why not just store the original partition count rather
>> than the whole mapping?
>>
>> -Jake
>>
>> On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <lindon...@gmail.com> wrote:
>>
>> > Hey Yi, Navina,
>> >
>> > I have updated the SEP-5 document based on our discussion. The
>> difference
>> > can be found here
>> > <https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?
>> > pageId=70255476&selectedPageVersions=14&selectedPageVersions=15>.
>> > Here is the summary of changes:
>> >
>> > - Add new interface that extends the existing interface
>> > SystemStreamPartitionGrouper. Newly-added grouper class should implement
>> > this interface.
>> > - Explained in the Rejected Alternative Section why we don't add new
>> method
>> > in the existing interface
>> > - Explained in the Rejected Alternative Section why we don't
>> config/class
>> > for user to specify new-partition to old-partition mapping.
>> >
>> > Can you take another look at the proposal and let me know if there is
>> any
>> > concern?
>> >
>> > Cheers,
>> > Dong
>> >
>> >
>> > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Yi,
>> > >
>> > > Thanks much for the comment. I have updated the doc to address all
>> your
>> > > comments except the one related to the interface. I am not sure I
>> > > understand your suggestion of the new interface. Will discuss
>> tomorrow.
>> > >
>> > > Thanks,
>> > > Dong
>> > >
>> > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com> wrote:
>> > >
>> > >> Hi, Don,
>> > >>
>> > >> Thanks for the detailed design doc for a long-waited feature in
>> Samza!
>> > >> Really appreciate it! I did a quick pass and have the following
>> > comments:
>> > >>
>> > >> - minor: "limit the maximum size of partition" ==> "limit the maximum
>> > size
>> > >> of each partition"
>> > >> - "However, Samza currently is not able to handle partition
>> expansion of
>> > >> the input streams"==>better point out "for stateful jobs". For
>> stateless
>> > >> jobs, simply bouncing the job now can pick up the new partitions.
>> > >> - "it is possible (e.g. with Kafka) that messages with a given key
>> > exists
>> > >> in both partition 1 an 3. Because GroupByPartition will assign
>> > partition 1
>> > >> and 3 to different tasks, messages with the same key may be handled
>> by
>> > >> different task/container/process and their state will be stored in
>> > >> different changelog partition." The problem statement is not super
>> clear
>> > >> here. The issues with stateful jobs is: after GroupByPartition assign
>> > >> partition 1 and 3 to different tasks, the new task handling
>> partition 3
>> > >> does not have the previous state to resume the work. e.g. a page-key
>> > based
>> > >> counter would start from 0 in the new task for a specific key,
>> instead
>> > of
>> > >> resuming the previous count 50 held by task 1.
>> > >> - minor rewording: "the first solution in this doc" ==> "the solution
>> > >> proposed in this doc"
>> > >> - "Thus additional development work is needed in Kafka to meet this
>> > >> requirement" It would be good to link to a KIP if and when it exists
>> > >> - Instead of touching/deprecating the interface
>> > >> SystemStreamPartitionGrouper, I would recommend to have a different
>> > >> implementation class of the interface, which in the constructor of
>> the
>> > >> grouper, takes two parameters: a) the previous task number read from
>> the
>> > >> coordinator stream; b) the configured new-partition to old-partition
>> > >> mapping policy. Then, the grouper's interface method stays the same
>> and
>> > >> the
>> > >> behavior of the grouper is more configurable which is good to
>> support a
>> > >> broader set of use cases in addition to Kafka's built-in partition
>> > >> expansion policies.
>> > >> - Minor renaming suggestion to the new grouper class names:
>> > >> GroupByPartitionWithFixedTaskNum
>> > >> and GroupBySystemStreamPartitionWithFixedTaskNum
>> > >>
>> > >> Thanks!
>> > >>
>> > >> - Yi
>> > >>
>> > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > >>
>> > >> > Hey Navina,
>> > >> >
>> > >> > Thanks much for the comment. Please see my response below.
>> > >> >
>> > >> > Regarding your biggest gripe with the SEP, I personally think the
>> > >> > operational requirement proposed in the KIP are pretty general and
>> > >> could be
>> > >> > easily enforced by other systems. The reason is that the module
>> > >> operation
>> > >> > is pretty standard and the default option when we choose partition.
>> > And
>> > >> > usually the underlying system allows user to select arbitrary
>> > partition
>> > >> > number if it supports partition expansion. Do you know any system
>> that
>> > >> does
>> > >> > not meet these two requirement?
>> > >> >
>> > >> > Regarding your comment of the Motivation section, I renamed the
>> first
>> > >> > section as "Problem and Goal" and specified that "*The goal of this
>> > >> > proposal is to enable partition expansion of the input streams*.".
>> I
>> > >> also
>> > >> > put a sentence at the end of the Motivation section that "*The
>> feature
>> > >> of
>> > >> > task expansion is out of the scope of this proposal and will be
>> > >> addressed
>> > >> > in a future SEP*". The second paragraph in the Motivation section
>> is
>> > >> mainly
>> > >> > used to explain the thinking process that we have gone through,
>> what
>> > >> other
>> > >> > alternative we have considered, and we plan to do in Samza in the
>> nex
>> > >> step.
>> > >> >
>> > >> > To answer your question why increasing the partition number will
>> > >> increase
>> > >> > the throughput of the kafka consumer in the container, Kafka
>> consumer
>> > >> can
>> > >> > potentially fetch more data in one FetchResponse with more
>> partitions
>> > in
>> > >> > the FetchRequest. This is because we limit the maximum amount of
>> data
>> > >> that
>> > >> > can be fetch for a given partition in the FetchResponse. This by
>> > >> default is
>> > >> > set to 1 MB. And there is reason that we can not arbitrarily bump
>> up
>> > >> this
>> > >> > limit.
>> > >> >
>> > >> > To answer your question how partition expansion in Kafka impacts
>> the
>> > >> > clients, Kafka consumer is able to automatically detect new
>> partition
>> > of
>> > >> > the topic and reassign all (both old and new) partitions across
>> > >> consumers
>> > >> > in the consumer group IF you tell consumer the topic to be
>> subscribed.
>> > >> But
>> > >> > consumer in Samza's container uses another way of subscription.
>> > Instead
>> > >> of
>> > >> > subscribing to the topic, the consumer in Samza's container
>> subscribes
>> > >> to
>> > >> > the specific partitions of the topic. In this case, if new
>> partitions
>> > >> have
>> > >> > been added, Samza will need to explicitly subscribe to the new
>> > >> partitions
>> > >> > of the topic. The "Handle partition expansion while tasks are
>> running"
>> > >> > section in the SEP addresses this issue in Samza -- it recalculates
>> > the
>> > >> job
>> > >> > model and restart container so that consumer can subscribe to the
>> new
>> > >> > partitions.
>> > >> >
>> > >> > I will ask other dev to take a look at the proposal. I will start
>> the
>> > >> > voting thread tomorrow if there is no further concern with the SEP.
>> > >> >
>> > >> > Thanks!
>> > >> > Dong
>> > >> >
>> > >> >
>> > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
>> > >> > nav...@apache.org>
>> > >> > wrote:
>> > >> >
>> > >> > > Hey Dong,
>> > >> > >
>> > >> > > >  I have updated the motivation section to clarify this.
>> > >> > >
>> > >> > > Thanks for updating the motivation. Couple of notes here:
>> > >> > >
>> > >> > > 1.
>> > >> > > > "The motivation of increasing partition number of Kafka topic
>> > >> includes
>> > >> > 1)
>> > >> > > limit the maximum size of a partition in order to improve broker
>> > >> > > performance and 2) increase throughput of Kafka consumer in the
>> > Samza
>> > >> > > container."
>> > >> > >
>> > >> > > It's unclear to me how increasing the partition number will
>> increase
>> > >> the
>> > >> > > throughput of the kafka consumer in the container? Theoretically,
>> > you
>> > >> > will
>> > >> > > still be consuming the same amount of data in the container,
>> > >> irrespective
>> > >> > > of whether it is coming from one partition or more than one
>> expanded
>> > >> > > partitions. Can you please explain it for me here, what you mean
>> by
>> > >> that?
>> > >> > >
>> > >> > > 2. I believe the second paragraph under motivation is simply
>> talking
>> > >> > about
>> > >> > > the scope of the current SEP. It will be easier to read if what
>> > >> solution
>> > >> > is
>> > >> > > included in this SEP and what is left out as not in scope. (for
>> > >> example,
>> > >> > > expansions for stateful jobs is supported or not).
>> > >> > >
>> > >> > > > We need to persist the task-to-sspList mapping in the
>> > >> > > coordinator stream so that the job can derive the original
>> number of
>> > >> > > partitions of each input stream regardless of how many times the
>> > >> > partition
>> > >> > > has expanded. Does this make sense?
>> > >> > >
>> > >> > > Yes. It does!
>> > >> > >
>> > >> > > > I am not sure how this is related to the locality though. Can
>> you
>> > >> > clarify
>> > >> > > your question if I haven't answered your question?
>> > >> > >
>> > >> > > It's not related. I just meant to give an example of yet another
>> > >> > > coordinator message that is persisted. Your ssp-to-task mapping
>> is
>> > >> > > following a similar pattern for persisting. Just wanted to
>> clarify
>> > >> that.
>> > >> > >
>> > >> > > > Can you let me know if this, together with the answers in the
>> > >> previous
>> > >> > > email, addresses all your questions?
>> > >> > >
>> > >> > > Yes. I believe you have addressed most of my questions. Thanks
>> for
>> > >> taking
>> > >> > > time to do that.
>> > >> > >
>> > >> > > > Is there specific question you have regarding partition
>> > >> > > expansion in Kafka?
>> > >> > >
>> > >> > > I guess my questions are on how partition expansion in Kafka
>> impacts
>> > >> the
>> > >> > > clients. Iiuc, partition expansions are done manually in Kafka
>> based
>> > >> on
>> > >> > the
>> > >> > > bytes-in rate of the partition. Do the existing kafka clients
>> handle
>> > >> this
>> > >> > > expansion automatically? if yes, how does it work? If not, are
>> there
>> > >> > plans
>> > >> > > to support it in the future?
>> > >> > >
>> > >> > > > Thus user's job should not need to bootstrap key/value store
>> from
>> > >> the
>> > >> > > changelog topic.
>> > >> > >
>> > >> > > Why is this discussion relevant here? Key/value store / changelog
>> > >> topic
>> > >> > > partition is scoped with the context of a task. Since we are not
>> > >> changing
>> > >> > > the number of tasks, I don't think it is required to mention it
>> > here.
>> > >> > >
>> > >> > > > The new method takes the SystemStreamPartition-to-Task
>> assignment
>> > >> from
>> > >> > > the previous job model which can be read from the coordinator
>> > stream.
>> > >> > >
>> > >> > > Jobmodel is currently not persisted to coordinator stream. In
>> your
>> > >> > design,
>> > >> > > you talk about writing separate coordinator messages for
>> ssp-to-task
>> > >> > > assignments. Hence, please correct this statement. It is kind of
>> > >> > misleading
>> > >> > > to the reader.
>> > >> > >
>> > >> > > My biggest gripe with this SEP is that it seems like a
>> tailor-made
>> > >> > solution
>> > >> > > that relies on the semantics of the Kafka system and yet, we are
>> > >> trying
>> > >> > to
>> > >> > > masquerade that as operational requirements for other systems
>> > >> interacting
>> > >> > > with Samza. (Not to say that this is the first time such a
>> choice is
>> > >> > being
>> > >> > > made in the Samza design). I am not seeing how this can a
>> "general"
>> > >> > > solution for all input systems. That's my two cents. I would
>> like to
>> > >> hear
>> > >> > > alternative points of view for this from other devs.
>> > >> > >
>> > >> > > Please make sure you have enough eyes on this SEP. If you do,
>> please
>> > >> > start
>> > >> > > a VOTE thread to approve this SEP.
>> > >> > >
>> > >> > > Thanks!
>> > >> > > Navina
>> > >> > >
>> > >> > >
>> > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <lindon...@gmail.com>
>> > >> wrote:
>> > >> > >
>> > >> > > > Hey Navina,
>> > >> > > >
>> > >> > > > I have updated the wiki based on your suggestion. More
>> > >> specifically, I
>> > >> > > have
>> > >> > > > made the following changes:
>> > >> > > >
>> > >> > > > - Improved Problem section and Motivation section to describe
>> why
>> > we
>> > >> > use
>> > >> > > > the solution in this proposal instead of tackling the problem
>> of
>> > >> task
>> > >> > > > expansion directly.
>> > >> > > >
>> > >> > > > - Illustrate the design in a way that doesn't bind to Kafka.
>> Kafka
>> > >> is
>> > >> > > only
>> > >> > > > used as example to illustrate why we want to expand partition
>> > >> expansion
>> > >> > > and
>> > >> > > > whether the operational requirement can be supported when
>> Kafka is
>> > >> used
>> > >> > > as
>> > >> > > > the input system. Note that the proposed solution should work
>> for
>> > >> any
>> > >> > > input
>> > >> > > > system that meets the operational requirement described in the
>> > wiki.
>> > >> > > >
>> > >> > > > - Fixed the problem in the figure.
>> > >> > > >
>> > >> > > > - Added a new class GroupBySystemStreamPartitionFixedTaskNum
>> to
>> > the
>> > >> > > wiki.
>> > >> > > > Together with GroupByPartitionFixedTaskNum, it should ensure
>> that
>> > we
>> > >> > > have a
>> > >> > > > solution to enable partition expansion for all users that are
>> > using
>> > >> > > > pre-defined grouper in Samza. Note that those users who use
>> custom
>> > >> > > grouper
>> > >> > > > would need to update their implementation.
>> > >> > > >
>> > >> > > > Can you let me know if this, together with the answers in the
>> > >> previous
>> > >> > > > email, addresses all your questions? Thanks for taking time to
>> > >> review
>> > >> > the
>> > >> > > > proposal.
>> > >> > > >
>> > >> > > > Regards,
>> > >> > > > Dong
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > >
>> > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
>> lindon...@gmail.com>
>> > >> > wrote:
>> > >> > > >
>> > >> > > > > Hey Navina,
>> > >> > > > >
>> > >> > > > > Thanks much for your comments. Please see my reply inline.
>> > >> > > > >
>> > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <
>> > >> > > > > nav...@apache.org> wrote:
>> > >> > > > >
>> > >> > > > >> Thanks for the SEP, Dong. I have a couple of questions to
>> > >> understand
>> > >> > > > your
>> > >> > > > >> proposal better:
>> > >> > > > >>
>> > >> > > > >> * Under motivation, you mention that "_We expect this
>> solution
>> > to
>> > >> > work
>> > >> > > > >> similarly with other input system as well._", yet I don't
>> see
>> > any
>> > >> > > > >> discussion on how it will work with other input systems.
>> That
>> > is,
>> > >> > what
>> > >> > > > >> kind
>> > >> > > > >> of contract does samza expect from other input systems ? If
>> we
>> > >> are
>> > >> > not
>> > >> > > > >> planning to provide a generic solution, it might be worth
>> > >> calling it
>> > >> > > out
>> > >> > > > >> in
>> > >> > > > >> the SEP.
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > I think the contract we expect from other systems are exactly
>> > the
>> > >> > > > > operational requirement mentioned in the SEP, i.e. partitions
>> > >> should
>> > >> > > > always
>> > >> > > > > be doubled and the hash algorithm should module the number of
>> > >> > > partitions.
>> > >> > > > > SEP-5 should also allow partition expansion of all input
>> systems
>> > >> that
>> > >> > > > meet
>> > >> > > > > these two requirements. I have updated the motivation
>> section to
>> > >> > > clarify
>> > >> > > > > this.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> * I understand the partition mapping logic you have
>> proposed.
>> > >> But I
>> > >> > > > think
>> > >> > > > >> the example explanation doesn't match the diagram. In the
>> > >> diagram,
>> > >> > > after
>> > >> > > > >> expansion, partiion-0 and partition-1 are pointing to
>> bucket 0
>> > >> and
>> > >> > > > >> partition-3 and partition-4 are pointing to bucket 1. I
>> think
>> > the
>> > >> > > former
>> > >> > > > >> has to be partition-0 and partition-2 and the latter, is
>> > >> partition-1
>> > >> > > and
>> > >> > > > >> partition-3. If I am wrong, please help me understand the
>> logic
>> > >> :)
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > Good catch. I will update the figure to fix this problem.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> * I don't know how partition expansion in Kafka works. I am
>> > >> familiar
>> > >> > > > with
>> > >> > > > >> how shard splitting happens in Kinesis - there is
>> hierarchical
>> > >> > > relation
>> > >> > > > >> between the parent and child shards. This way, it will also
>> > allow
>> > >> > the
>> > >> > > > >> shards to be merged back. Iiuc, Kafka only supports
>> partition
>> > >> > > > "expansion",
>> > >> > > > >> as opposed to "splits". Can you provide some context or link
>> > >> related
>> > >> > > to
>> > >> > > > >> how
>> > >> > > > >> partition expansion works in Kafka?
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > I couldn't find any wiki on partition expansion in Kafka. The
>> > >> > partition
>> > >> > > > > expansion logic in Kafka is very simply -- it simply adds new
>> > >> > partition
>> > >> > > > to
>> > >> > > > > the existing topic. Is there specific question you have
>> > regarding
>> > >> > > > partition
>> > >> > > > > expansion in Kafka?
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> * Are you only recommending that expansion can be supported
>> for
>> > >> > samza
>> > >> > > > jobs
>> > >> > > > >> that use Kafka as input systems **and** configure the
>> > SSPGrouper
>> > >> as
>> > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this only
>> > applies
>> > >> > for
>> > >> > > > >> GroupByPartition. Please correct me if I am wrong. What is
>> the
>> > >> > > > expectation
>> > >> > > > >> for custom SSP Groupers?
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > The expansion can be supported for Samza jobs if the input
>> > system
>> > >> > meets
>> > >> > > > > the operational requirement mentioned above. It doesn't have
>> to
>> > >> use
>> > >> > > Kafka
>> > >> > > > > as input system.
>> > >> > > > >
>> > >> > > > > The current proposal provided solution for jobs that
>> currently
>> > use
>> > >> > > > > GroupByPartition. The proposal can be extended to support
>> jobs
>> > >> that
>> > >> > use
>> > >> > > > > other grouper that are pre-defined in Samza. The custom SSP
>> > >> grouper
>> > >> > > needs
>> > >> > > > > to handle partition expansion similar to how
>> > >> > > GroupByPartitionFixedTaskNum
>> > >> > > > > handles it and it is users' responsibility to update their
>> > custom
>> > >> > > grouper
>> > >> > > > > implementation.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> * Regarding storing SSP-to-Task assignment to coordinator
>> > stream:
>> > >> > > Today,
>> > >> > > > >> the JobModel encapsulates the data model in samza which also
>> > >> > includes
>> > >> > > > >> **TaskModels**. TaskModel, typically shows the
>> task-to-sspList
>> > >> > > mapping.
>> > >> > > > >> What is the reason for using a separate coordinator stream
>> > >> message
>> > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel itself is
>> not
>> > >> > > persisted
>> > >> > > > in
>> > >> > > > >> the coordinator stream today?  The reason locality exists
>> > >> outside of
>> > >> > > the
>> > >> > > > >> jobmodel is because *locality* information is written by
>> each
>> > >> > > container,
>> > >> > > > >> where as it is consumed only by the leader
>> jobcoordinator/AM.
>> > In
>> > >> > this
>> > >> > > > >> case,
>> > >> > > > >> the writer of the mapping information and the reader is
>> still
>> > the
>> > >> > > leader
>> > >> > > > >> jobcoordinator/AM. So, I want to understand the motivation
>> for
>> > >> this
>> > >> > > > >> choice.
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > > Yes, the reason for using a separate coordinate stream
>> message
>> > is
>> > >> > > because
>> > >> > > > > the task-to-sspList mapping is not currently persisted in the
>> > >> > > coordinator
>> > >> > > > > stream. We wouldn't need to create this new stream message if
>> > >> > JobModel
>> > >> > > is
>> > >> > > > > persisted. We need to persist the task-to-sspList mapping in
>> the
>> > >> > > > > coordinator stream so that the job can derive the original
>> > number
>> > >> of
>> > >> > > > > partitions of each input stream regardless of how many times
>> the
>> > >> > > > partition
>> > >> > > > > has expanded. Does this make sense?
>> > >> > > > >
>> > >> > > > > I am not sure how this is related to the locality though. Can
>> > you
>> > >> > > clarify
>> > >> > > > > your question if I haven't answered your question?
>> > >> > > > >
>> > >> > > > > Thanks!
>> > >> > > > > Dong
>> > >> > > > >
>> > >> > > > >
>> > >> > > > >>
>> > >> > > > >> Cheers!
>> > >> > > > >> Navina
>> > >> > > > >>
>> > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <
>> lindon...@gmail.com
>> > >
>> > >> > > wrote:
>> > >> > > > >>
>> > >> > > > >> > Hi all,
>> > >> > > > >> >
>> > >> > > > >> > We created SEP-5: Enable partition expansion of input
>> > streams.
>> > >> > > Please
>> > >> > > > >> find
>> > >> > > > >> > the SEP wiki in the link
>> > >> > > > >> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
>> > >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
>> > >> > > > >> > .
>> > >> > > > >> >
>> > >> > > > >> > You feedback is appreciated!
>> > >> > > > >> >
>> > >> > > > >> > Thanks,
>> > >> > > > >> > Dong
>> > >> > > > >> >
>> > >> > > > >>
>> > >> > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Reply via email to