Thanks, Dong.

The summary looks accurate.

I'll let the others chime in, as I believe my perspective has been
adequately captured in this thread.

-Jake

On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jacob,
>
> Thank you for taking so much time to discuss with me! I appreciate the
> discussion and the insight. I will summarize our discussion below.
>
> 1) Whether it is reasonable to store partition-to-task mapping.
>
> We agree that this partition-to-task mapping will be reasonable if we allow
> user to specify either the new-partition-to-old-partition mapping or
> key-to-partition mapping in the future. SEP-5 doesn't currently provide a
> way for user to specify new-partition-to-old partition mapping because we
> don't have a good idea about that interface until we try to enable
> partition expansion for input system other than Kafka in the future. This
> is currently specified as the third future work in SEP-5.
>
> And if we decide to implement SEP-5, I will include a warning message
> regarding the use of partition-to-task, i.e. "this does not specify the
> key-to-task mapping". We agree that this could address the concern here.
>
> 2) Whether we should follow the approach in SEP-5 or use an extra
> re-partitioning stage in the stateful Samza job to enable partition
> expansion.
>
> Here are the pros and cons of the extra re-partitioning stage in comparison
> to SEP-5.
>
> Pros:
> - It doesn't require owner of the Samza job to know the partitioning
> algorithm of used for the input stream. If the owner of the Samza job is in
> a different organization than the producer of the input stream, this
> solution frees different organizations from having to coordinate with each
> other.
> - It doesn't require owner of the Samza job to specify the partitioning
> algorithm of used for the input stream. Thus less config.
>
> Cons:
> - User has to make code change on their side to use the new fluent API.
> - The extra partitioning stage would potentially increases latency.
> - The extra partitioning stage would incur additional cost due to the extra
> internal topic. The cost is probably not that much with the new trim() API
> in Kafka if Samza uses Kafka to store the internal topic. But the cost may
> be doubled if Samza uses another input system that doesn't provide trim()
> API to delete data on demand.
>
> My recommendation is to adopt a hybrid solution, i.e. we still implement
> the current proposal in SEP-5 so that we enable partition expansion without
> incurring extra latency/cost and without requiring users to change their
> code. And we can recommend user to use the extra partitioning stage if the
> coordination among different organization is indeed a concern.
>
> Can other developers also provide feedback regarding your preference
> between the two?
>
> Thanks,
> Dong
>
>
>
>
> On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <jacob.m...@gmail.com> wrote:
>
> > Hey Dong,
> >
> > I appreciate your thoughtful responses. Let's do one more round :-)
> >
> >
> > > Here are my current concern with the three alternatives you described
> > > earlier:
> > > - The first alternative requires support from input system which is
> > > currently not available. It will limit the usage of partition expansion
> > to
> > > only systems that support such interface. And it is not guaranteed that
> > we
> > > can persuade the developer of the input system to add this interface.
> > This
> > > is not desirable for Samza in the long term.
> >
> > Agreed. It is very wishful thinking that each supported system would
> > provide such a contract.
> >
> >
> > >
> > > - I can not comment on the second alternative because I don't
> understand
> > > how it reshuffles all existing changelog data. We can discuss more if
> > there
> > > is more specific detail. My gut feel is that this will be complex and
> > > carries performance overhead.
> >
> > After giving this more thought, I agree. There is no clear way to
> migrate a
> > changelog without knowing the original key->partition mapping. Which
> leads
> > us to alternative 3...
> >
> >
> > >
> > > - The third alternative requires performance overhead. Given that user
> > can
> > > already use this solution to enable partition expansion, maybe Samza
> > > developers can provide more input as to why we are not doing it by
> > default.
> > > My gut feel is that it carries considerable performance overhead and
> > > increases the cost-to-serve Samze job (e.g. disk usage), which may make
> > it
> > > undesirable in the long term.
> >
> > I think the only performance overhead would be the mandatory
> repartitioning
> > stage for stateful jobs. But a repartitioner is usually much faster than
> > the downstream stateful job, so it only seems a cost to serve issue.
> >
> > As for why we aren't already doing this, I would posit that before the
> > introduction of the high level API, which trivializes repartitioning, it
> > was unreasonable to expect each job owner to do the mandatory
> > repartitioning. With the high level API, I would argue this is much more
> > doable.
> >
> >
> > I am not sure it is true that "any future feature that utilizes this
> > > mapping without accounting for the assumptions of this SEP is likely to
> > > malfunction". Suppose we allow user to specify new-to-old-partition
> > > mapping, then we can use the partition-to-task mapping correctly
> without
> > > replying on the assumption in this SEP, right?
> >
> > Right, but my point was that the partition->task mapping is not
> sufficient
> > by itself. So adding it by itself is potentially misleading.
> >
> > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Thanks for the reply Jacob. Please see my comment inline.
> > >
> > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com>
> > wrote:
> > >
> > > > >
> > > > > - For users that need partition expansion of the input streams for
> > > > stateful
> > > > > job, they have a really big headache in the sense that Samza does
> not
> > > > allow
> > > > > partition expansion for stateful job. SEP-5 addresses this headache
> > for
> > > > > them.
> > > > > You are right that SEP-5 requires user to understand and enforce
> > > > > limitations across organizations. But it is still much better than
> > not
> > > > > allowing user to expansion partition for stateful jobs at all,
> right?
> > > > Did I
> > > > > miss something here?
> > > >
> > > > I guess this one is a matter of perspective.
> > > >
> > > > One argument is that if the system supports one case, it's better
> than
> > > none
> > > > because there is one less scenario in which the system does the wrong
> > > > thing.
> > > >
> > > > The counter argument is for uniform and consistent behavior, which is
> > > easy
> > > > for users to understand and properly leverage.
> > > >
> > > > Specifically, I'd argue that the current rule is very simple: "you
> > cannot
> > > > repartition inputs on a stateful job, so you must over-partition the
> > > > initial implementation". To me, while that rule is not ideal, its
> > > > simplicity is better that introducing a new solution that has a bunch
> > of
> > > > caveats, any one of which could be missed. If any one of the
> > assumptions
> > > in
> > > > this SEP design are violated, the job would behave incorrectly. That
> > > puts a
> > > > lot more burden on the users than the simpler rule.
> > > >
> > >
> > > I agree that we have different perspective here. It is true that user
> > would
> > > mess up their job if they used this feature in a wrong way, i.e.
> violate
> > > the assumption made in SEP-5. On the other hand, I think there is
> always
> > a
> > > way for user to mess up their job if they configure the Samza job
> > > incorrectly. I also think the assumption made in this SEP is not
> > > particularly harder to understand than other existing configs in Samza.
> > >
> > > The answer to this can be subjective. I would love to hear perspective
> > from
> > > other developers on this issue.
> > >
> > >
> > > >
> > > > That's why I mentioned a few alternatives that, while more complex to
> > > > implement, would provide a more consistent behavior with simple rules
> > for
> > > > the users.
> > > >
> > >
> > > I am open to discuss alternative solutions that can address the the
> > problem
> > > in a better manner. I am not opposed to complexity as long as it gives
> us
> > > good long term benefits.
> > >
> > > Here are my current concern with the three alternatives you described
> > > earlier:
> > >
> > > - The first alternative requires support from input system which is
> > > currently not available. It will limit the usage of partition expansion
> > to
> > > only systems that support such interface. And it is not guaranteed that
> > we
> > > can persuade the developer of the input system to add this interface.
> > This
> > > is not desirable for Samza in the long term.
> > >
> > > - I can not comment on the second alternative because I don't
> understand
> > > how it reshuffles all existing changelog data. We can discuss more if
> > there
> > > is more specific detail. My gut feel is that this will be complex and
> > > carries performance overhead.
> > >
> > > - The third alternative requires performance overhead. Given that user
> > can
> > > already use this solution to enable partition expansion, maybe Samza
> > > developers can provide more input as to why we are not doing it by
> > default.
> > > My gut feel is that it carries considerable performance overhead and
> > > increases the cost-to-serve Samze job (e.g. disk usage), which may make
> > it
> > > undesirable in the long term.
> > >
> > >
> > >
> > > >
> > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > > > > thFixedTaskNum
> > > > > as well. If there is more grouper classes needed in the future, we
> > can
> > > > > solve this problem cleanly without new config. Given the
> > > > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey
> will
> > > > throw
> > > > > exception if and only if newGrouperClass is an instance of
> > > > > previousGrouperClass.
> > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > > > > GroupBySystemStreamPartition
> > > > > and GroupByPartitionWithFixedTaskNum should extend
> GroupByPartition.
> > > > Does
> > > > > this address your concern?
> > > >
> > > > Sounds workable, thanks.
> > > >
> > > > >
> > > > > Can
> > > > > you be more specific why Partition-to-task mapping is not
> meaningful
> > > > > without
> > > > > some definition of the key-to-partition assignments and why it is
> > > > > incomplete and misleading?
> > > >
> > > >  A partition is (in my naive interpretation) an independent queue for
> > > > messages of a particular key set. It is not the *identity* of the
> > > partition
> > > > that determine the contents of the associated task's local state.
> > Rather
> > > it
> > > > is the *contents* of the partition that affect the task's state. A
> > > > partiton-to-task mapping only captures an identity relationship:
> > > > partition1->task1. Without the assumptions of this SEP, this is
> > > > insufficient to determine the assignment of keys to tasks, which is
> > what
> > > > really matters. Therefore, any future feature that utilizes this
> > mapping
> > > > without accounting for the assumptions of this SEP is likely to
> > > > malfunction.
> > > >
> > > >
> > > I am not sure it is true that "any future feature that utilizes this
> > > mapping without accounting for the assumptions of this SEP is likely to
> > > malfunction". Suppose we allow user to specify new-to-old-partition
> > > mapping, then we can use the partition-to-task mapping correctly
> without
> > > replying on the assumption in this SEP, right?
> > >
> > >
> > > >
> > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > > > Hey Jacob,
> > > > >
> > > > > Thanks for the explanation. It seems that your biggest concern is
> > with
> > > > the
> > > > > generality of the proposal. Let me try to address this and other
> > > comments
> > > > > below.
> > > > >
> > > > > 1) ... it will cause headaches for Samza users ...
> > > > >
> > > > > I am not sure I understand why this proposal causes headache for
> > Samza
> > > > > users. Here is the impact of the SEP-5 on users:
> > > > >
> > > > > - For users that do not need partition expansion of the input
> stream,
> > > > they
> > > > > can use Samza without change change in code/binary/config. Thus
> there
> > > is
> > > > no
> > > > > headache for them.
> > > > >
> > > > > - For users that need partition expansion of the input streams for
> > > > > stateless job, they currently need to manually reboot their Samza
> job
> > > in
> > > > > order to let Samza consume the new partitions created for the
> stream.
> > > > SEP-5
> > > > > actually reduced their headache by allowing Samza to automatically
> > > detect
> > > > > and consume new partitions.
> > > > >
> > > > > - For users that need partition expansion of the input streams for
> > > > stateful
> > > > > job, they have a really big headache in the sense that Samza does
> not
> > > > allow
> > > > > partition expansion for stateful job. SEP-5 addresses this headache
> > for
> > > > > them.
> > > > >
> > > > > You are right that SEP-5 requires user to understand and enforce
> > > > > limitations across organizations. But it is still much better than
> > not
> > > > > allowing user to expansion partition for stateful jobs at all,
> right?
> > > > Did I
> > > > > miss something here?
> > > > >
> > > > > 2) ... Separate orgs are often difficult to coordinate and a system
> > > which
> > > > > depends on such significant process/coordination is too fragile for
> > my
> > > > > taste ..
> > > > >
> > > > > This is true. Ideally we want a system that is fully self-serving.
> I
> > > > think
> > > > > this is a long term goal for Samza. Still, for the reasons
> described
> > > > above,
> > > > > I think something is better than nothing. I am open to alternative
> > > design
> > > > > that can support partition expansion for stateful jobs without
> > > requiring
> > > > > coordination.
> > > > >
> > > > > 3) There is currently no supported way of sharing state among the
> > tasks
> > > > of
> > > > > a container.  Each task has its own isolated store and that logical
> > > > > isolation is the primary thing that enables Samza jobs to scale
> with
> > a
> > > > > simple container count change. My feeling is that we should not
> > change
> > > > > this without
> > > > > good reason.
> > > > >
> > > > > I see your point. I will remove this sentence from the motivation
> > > > section.
> > > > > This won't have any impact on the design of the SEP-5. Does this
> > > address
> > > > > the problem?
> > > > >
> > > > > 4) With the current proposal, we'd also need a similar check for
> > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any
> > other
> > > > > groupers
> > > > > were later added with both these modes, we'd probably need to add
> > those
> > > > > too. It might be easier and cleaner to add a config to ignore that
> > > check
> > > > > temporarily. Down side is that it further complicates the Samza
> > config,
> > > > > which is already huge. Thoughts?
> > > > >
> > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > > > > thFixedTaskNum
> > > > > as well. If there is more grouper classes needed in the future, we
> > can
> > > > > solve this problem cleanly without new config. Given the
> > > > > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey
> will
> > > > throw
> > > > > exception if and only if newGrouperClass is an instance of
> > > > > previousGrouperClass.
> > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > > > > GroupBySystemStreamPartition
> > > > > and GroupByPartitionWithFixedTaskNum should extend
> GroupByPartition.
> > > > Does
> > > > > this address your concern?
> > > > >
> > > > > 5) The task-to-container and container-to-host mappings are both
> > > > meaningful
> > > > > in context of the JobModel. Partition-to-task mapping is not
> > meaningful
> > > > > without
> > > > > some definition of the key-to-partition assignments. It's
> incomplete
> > > > > information and therefore misleading. I think it only makes sense
> to
> > > use
> > > > > this mapping if we adopt a solution wherein Samza also knows the
> > > > partition
> > > > > key assignment.
> > > > >
> > > > > Partition-to-task is currently explicitly passed from job
> coordinator
> > > to
> > > > > each task as part of the job model to tell tasks which partitions
> to
> > > > > consume from. I think we can store some definition of the
> > > > key-to-partition
> > > > > assignments if Samza decides to get and use this information in the
> > > > > future. Can
> > > > > you be more specific why Partition-to-task mapping is not
> meaningful
> > > > > without
> > > > > some definition of the key-to-partition assignments and why it is
> > > > > incomplete and misleading?
> > > > >
> > > > >
> > > > > Thanks,
> > > > > Dong
> > > > >
> > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <jacob.m...@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hey Dong,
> > > > > >
> > > > > > I'm opposed (or a +0, at best) to this limited, Kafka-specific
> > > > solution.
> > > > > I
> > > > > > understand that the proposal is relatively simple to implement,
> > but I
> > > > > think
> > > > > > it will cause headaches for Samza users. They will not only have
> to
> > > > > > understand all the limitations (increase only, double partitions
> > > only,
> > > > > > partition using hash+modulo, etc) of this approach, but enforcing
> > > these
> > > > > > limitations can be a major problem, especially when the Samza
> jobs
> > > and
> > > > > > message brokers are managed by separate orgs in a company.
> Separate
> > > > orgs
> > > > > > are often difficult to coordinate and a system which depends on
> > such
> > > > > > significant process/coordination is too fragile for my taste.
> > > > > >
> > > > > > That said, I realize that my opinion is just one of many in the
> > > broader
> > > > > > community which may feel differently, so let me respond to some
> of
> > > the
> > > > > > other items in the discussion so we can clear them up:
> > > > > >
> > > > > > The task-to-container assignment matters because if the
> correlated
> > > > tasks
> > > > > > > (i.e. tasks that consume messages with the same key) needs to
> be
> > in
> > > > the
> > > > > > > same container so that they can share the same key/value local
> > > store
> > > > on
> > > > > > the
> > > > > > > same physical machine.
> > > > > >
> > > > > > There is currently no supported way of sharing state among the
> > tasks
> > > > of a
> > > > > > container.  Each task has its own isolated store and that logical
> > > > > isolation
> > > > > > is the primary thing that enables Samza jobs to scale with a
> simple
> > > > > > container count change. My feeling is that we should not change
> > this
> > > > > > without good reason.
> > > > > >
> > > > > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala
> > such
> > > > > that
> > > > > > > exception will not be thrown if new grouper is
> > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is
> > > > GroupByPartition.
> > > > > > Does
> > > > > > > this look reasonable?
> > > > > >
> > > > > > With the current proposal, we'd also need a similar check for
> > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any
> > > other
> > > > > > groupers were later added with both these modes, we'd probably
> need
> > > to
> > > > > add
> > > > > > those too. It might be easier and cleaner to add a config to
> ignore
> > > > that
> > > > > > check temporarily. Down side is that it further complicates the
> > Samza
> > > > > > config, which is already huge. Thoughts?
> > > > > >
> > > > > > I think storing the previous task-to-partition mapping is more
> > > general
> > > > > than
> > > > > > > storing the partition count of all topics for the following
> > > reasons:
> > > > > > > - Samza already stores the task-to-container mapping and
> > > > > > container-to-host
> > > > > > > mapping in the coordinator stream. It seems consistent to also
> > > store
> > > > > the
> > > > > > > partition-to-task mapping. And this information may be useful
> for
> > > > other
> > > > > > > use-case such as debugging.
> > > > > > > - By having the new interface take the previous
> task-to-partition
> > > > > > > assignment instead of a topic-to-partition-count mapping as new
> > > > > > parameter,
> > > > > > > we can potentially have grouper implementation to support other
> > > types
> > > > > of
> > > > > > > input systems.
> > > > > > > - It is sightly simpler to store the task-to-partition
> assignment
> > > > > because
> > > > > > > we don't need to know whether this is the first time a job is
> > > started
> > > > > or
> > > > > > > not. On the other hand, you can write topic-to-partition-count
> > > > mapping
> > > > > to
> > > > > > > the coordinator stream only if this is the first time the job
> is
> > > run
> > > > > >
> > > > > > The task-to-container and container-to-host mappings are both
> > > > meaningful
> > > > > in
> > > > > > context of the JobModel. Partition-to-task mapping is not
> > meaningful
> > > > > > without some definition of the key-to-partition assignments. It's
> > > > > > incomplete information and therefore misleading. I think it only
> > > makes
> > > > > > sense to use this mapping if we adopt a solution wherein Samza
> also
> > > > knows
> > > > > > the partition key assignment.
> > > > > >
> > > > > > -Jake
> > > > > >
> > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Jacob,
> > > > > > >
> > > > > > > Thanks for taking time to review the SEP.
> > > > > > >
> > > > > > > I agree with you and Navina that the current SEP doesn't
> provide
> > > > > support
> > > > > > to
> > > > > > > arbitrary input systems and it doesn't support partition
> shrink.
> > I
> > > > > think
> > > > > > > the scope of this SEP is to support partition expansion for
> Kafka
> > > > (the
> > > > > > most
> > > > > > > widely used input system of Samza) and keep the door open for
> > > > partition
> > > > > > > support of various input systems. The current design can
> support
> > > any
> > > > > > system
> > > > > > > that meets the two operational requirement specified in the
> doc.
> > > > > > >
> > > > > > > While it is possible to support more types of input systems, it
> > > will
> > > > > > likely
> > > > > > > add more complexity to the design. For example, the first
> > > alternative
> > > > > > > solution from you requires broker-side support to negotiate
> hash
> > > > > > algorithm.
> > > > > > > The second alternative solution requires changelog partition
> > > > reshuffle
> > > > > > > which carries its own design complexity and performance
> overhead.
> > > > There
> > > > > > is
> > > > > > > tradeoff between the generality and the complexity among these
> > > > > choices. I
> > > > > > > like the current design because it is simple and addresses a
> big
> > > > usage
> > > > > > > scenario for us. We can add more complexity to generalize the
> > > design
> > > > if
> > > > > > it
> > > > > > > enables important use-case. Does this sound reasonable?
> > > > > > >
> > > > > > > Note that the "Rejected Alternative" section also mentions the
> > > > > > possibility
> > > > > > > of supporting a wider range of input systems by allowing user
> to
> > > > > specify
> > > > > > > the new-partition to old-partition mapping. We are not doing it
> > > > because
> > > > > > 1)
> > > > > > > we may have better understanding of the design after we have a
> > > > specific
> > > > > > > second input system to support 2) the current design can be
> > > extended
> > > > to
> > > > > > > support general input systems. I think similar argument can be
> > > > applied
> > > > > > > explain why we don't have to support general input systems
> using
> > > the
> > > > > > > potentially-good alternatives you mentioned.
> > > > > > >
> > > > > > > I hope SEP-5 can be an important first-step towards supporting
> > > > > partition
> > > > > > > expansion of any input system.
> > > > > > >
> > > > > > > To answer your questions about the current proposal:
> > > > > > >
> > > > > > > >1. "An alternative solution is to allow task number to
> increase
> > > > after
> > > > > > > >partition expansion and uses a proper task-to-container
> > assignment
> > > > to
> > > > > > make
> > > > > > > >sure the Samza output is correct." What does the container
> have
> > to
> > > > do
> > > > > > with
> > > > > > > >stateful processing or output in general?
> > > > > > >
> > > > > > > The task-to-container assignment matters because if the
> > correlated
> > > > > tasks
> > > > > > > (i.e. tasks that consume messages with the same key) needs to
> be
> > in
> > > > the
> > > > > > > same container so that they can share the same key/value local
> > > store
> > > > on
> > > > > > the
> > > > > > > same physical machine.
> > > > > > >
> > > > > > > >2. When you use "Join" as an example, you basically mean
> > multiple
> > > > > > > >co-partitioned streams, right? This is opposed to multiple,
> > > > > > > >independently-partitioned streams or a single stream. Would be
> > > nice
> > > > to
> > > > > > > >formulate the proposal in these more general terms.
> > > > > > >
> > > > > > > I thought "join" is a commonly used to refer to the join
> > opeartion
> > > > with
> > > > > > > co-partitioned stream but I may be wrong. I have updated the
> wiki
> > > to
> > > > > > > explicitly mention "co-partitioned stream". Does this look
> better
> > > > now?
> > > > > > >
> > > > > > > >3. When switching SSP groupers, how will the users avoid the
> > > > > > > >org.apache.samza.checkpoint.kafka.
> > DifferingSystemStreamPartition
> > > > > > > GrouperFactoryValues
> > > > > > > >exception?
> > > > > > >
> > > > > > > I think we can hardcode new logic in
> KafkaCheckpointLogKey.scala
> > > such
> > > > > > that
> > > > > > > exception will not be thrown if new grouper is
> > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is
> > > > GroupByPartition.
> > > > > > Does
> > > > > > > this look reasonable?
> > > > > > >
> > > > > > > >4. Partition to task assignment is meaningless without key to
> > > > > partition
> > > > > > > >mapping. The real semantics are captured in the external
> > > requirement
> > > > > for
> > > > > > > >partitioning via hash+modulo. But in that case, iiuc, only the
> > > > > partition
> > > > > > > >count matters. So why not just store the original partition
> > count
> > > > > rather
> > > > > > > >than the whole mapping?
> > > > > > >
> > > > > > > I think storing the previous task-to-partition mapping is more
> > > > general
> > > > > > than
> > > > > > > storing the partition count of all topics for the following
> > > reasons:
> > > > > > >
> > > > > > > - Samza already stores the task-to-container mapping and
> > > > > > container-to-host
> > > > > > > mapping in the coordinator stream. It seems consistent to also
> > > store
> > > > > the
> > > > > > > partition-to-task mapping. And this information may be useful
> for
> > > > other
> > > > > > > use-case such as debugging.
> > > > > > >
> > > > > > > - By having the new interface take the previous
> task-to-partition
> > > > > > > assignment instead of a topic-to-partition-count mapping as new
> > > > > > parameter,
> > > > > > > we can potentially have grouper implementation to support other
> > > types
> > > > > of
> > > > > > > input systems.
> > > > > > >
> > > > > > > - It is sightly simpler to store the task-to-partition
> assignment
> > > > > because
> > > > > > > we don't need to know whether this is the first time a job is
> > > started
> > > > > or
> > > > > > > not. On the other hand, you can write topic-to-partition-count
> > > > mapping
> > > > > to
> > > > > > > the coordinator stream only if this is the first time the job
> is
> > > run
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <
> > jacob.m...@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Dong,
> > > > > > > >
> > > > > > > > Thanks for the SEP. Supporting partition changes is
> critically
> > > > > > important
> > > > > > > > for stateful Samza jobs, so it's great to see some ideas on
> > that
> > > > > front!
> > > > > > > >
> > > > > > > > Sorry for the late feedback, but I have a few thoughts to
> > > > contribute.
> > > > > > > >
> > > > > > > > Big +1 on Navina's comment:
> > > > > > > >
> > > > > > > > > My biggest gripe with this SEP is that it seems like a
> > > > tailor-made
> > > > > > > > > solution
> > > > > > > > > that relies on the semantics of the Kafka system and yet,
> we
> > > are
> > > > > > trying
> > > > > > > > to
> > > > > > > > > masquerade that as operational requirements for other
> systems
> > > > > > > interacting
> > > > > > > > > with Samza. (Not to say that this is the first time such a
> > > choice
> > > > > is
> > > > > > > > being
> > > > > > > > > made in the Samza design). I am not seeing how this can a
> > > > "general"
> > > > > > > > > solution for all input systems. That's my two cents. I
> would
> > > like
> > > > > to
> > > > > > > hear
> > > > > > > > > alternative points of view for this from other devs.
> > > > > > > >
> > > > > > > >
> > > > > > > > Two examples of this:
> > > > > > > > 1. This is mostly a hypothetical, but some message brokers
> may
> > > use
> > > > > key
> > > > > > > > range assignment rather than hash+modulo.
> > > > > > > > 2. Kafka can't reduce the number of partitions, but it can
> > happen
> > > > on
> > > > > > > other
> > > > > > > > systems. For example, it may be cheaper to reduce the number
> of
> > > > > > > partitions
> > > > > > > > on a hosted service where the cost model depends on the
> number
> > of
> > > > > > > > partitions/shards.
> > > > > > > >
> > > > > > > > It seems to me that a solution which doesn't depend on
> > partition
> > > > key
> > > > > > > > assignment in the message broker. Here are a few alternatives
> > > that
> > > > > > > weren't
> > > > > > > > discussed and I think should be considered:
> > > > > > > >
> > > > > > > > Alternatives in order of increasing preference:
> > > > > > > > 1. Samza manages the partition hash (via some new contract
> with
> > > the
> > > > > > > > brokers) and guarantees correct routing of keys among the new
> > > > > > partitions.
> > > > > > > > 2. Samza detects a task count change, creates a new changelog
> > > with
> > > > > > > correct
> > > > > > > > partitions, and *somehow* reshuffles all existing changelog
> > data
> > > > into
> > > > > > the
> > > > > > > > new topic and then uses the new topic from then on. (doesn't
> > work
> > > > > > without
> > > > > > > > changelog, but in that case durability isn't paramount, so we
> > can
> > > > > just
> > > > > > > > wipe)
> > > > > > > > 3. Use RPC in between stages and samza fully manages key
> > > assignment
> > > > > > among
> > > > > > > > tasks. No on-disk topic data to clean up. Mandatory
> > > repartitioning
> > > > in
> > > > > > the
> > > > > > > > first stage to pre-scaled tasks in next stage.
> > > > > > > > 4. Combined 2-3 solution
> > > > > > > >
> > > > > > > > Finally, some questions about the current proposal:
> > > > > > > > 1. "An alternative solution is to allow task number to
> increase
> > > > after
> > > > > > > > partition expansion and uses a proper task-to-container
> > > assignment
> > > > to
> > > > > > > make
> > > > > > > > sure the Samza output is correct." What does the container
> have
> > > to
> > > > do
> > > > > > > with
> > > > > > > > stateful processing or output in general?
> > > > > > > > 2. When you use "Join" as an example, you basically mean
> > multiple
> > > > > > > > co-partitioned streams, right? This is opposed to multiple,
> > > > > > > > independently-partitioned streams or a single stream. Would
> be
> > > nice
> > > > > to
> > > > > > > > formulate the proposal in these more general terms.
> > > > > > > > 3. When switching SSP groupers, how will the users avoid the
> > > > > > > > org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti
> > > > > > > > tionGrouperFactoryValues
> > > > > > > > exception?
> > > > > > > > 4. Partition to task assignment is meaningless without key to
> > > > > partition
> > > > > > > > mapping. The real semantics are captured in the external
> > > > requirement
> > > > > > for
> > > > > > > > partitioning via hash+modulo. But in that case, iiuc, only
> the
> > > > > > partition
> > > > > > > > count matters. So why not just store the original partition
> > count
> > > > > > rather
> > > > > > > > than the whole mapping?
> > > > > > > >
> > > > > > > > -Jake
> > > > > > > >
> > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <
> lindon...@gmail.com
> > >
> > > > > wrote:
> > > > > > > >
> > > > > > > > > Hey Yi, Navina,
> > > > > > > > >
> > > > > > > > > I have updated the SEP-5 document based on our discussion.
> > The
> > > > > > > difference
> > > > > > > > > can be found here
> > > > > > > > > <https://cwiki.apache.org/confluence/pages/
> > > > > diffpagesbyversion.action
> > > > > > ?
> > > > > > > > > pageId=70255476&selectedPageVersions=14&
> selectedPageVersions
> > > > =15>.
> > > > > > > > > Here is the summary of changes:
> > > > > > > > >
> > > > > > > > > - Add new interface that extends the existing interface
> > > > > > > > > SystemStreamPartitionGrouper. Newly-added grouper class
> > should
> > > > > > > implement
> > > > > > > > > this interface.
> > > > > > > > > - Explained in the Rejected Alternative Section why we
> don't
> > > add
> > > > > new
> > > > > > > > method
> > > > > > > > > in the existing interface
> > > > > > > > > - Explained in the Rejected Alternative Section why we
> don't
> > > > > > > config/class
> > > > > > > > > for user to specify new-partition to old-partition mapping.
> > > > > > > > >
> > > > > > > > > Can you take another look at the proposal and let me know
> if
> > > > there
> > > > > is
> > > > > > > any
> > > > > > > > > concern?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Dong
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <
> > lindon...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Yi,
> > > > > > > > > >
> > > > > > > > > > Thanks much for the comment. I have updated the doc to
> > > address
> > > > > all
> > > > > > > your
> > > > > > > > > > comments except the one related to the interface. I am
> not
> > > > sure I
> > > > > > > > > > understand your suggestion of the new interface. Will
> > discuss
> > > > > > > tomorrow.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Dong
> > > > > > > > > >
> > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <
> > nickpa...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> Hi, Don,
> > > > > > > > > >>
> > > > > > > > > >> Thanks for the detailed design doc for a long-waited
> > feature
> > > > in
> > > > > > > Samza!
> > > > > > > > > >> Really appreciate it! I did a quick pass and have the
> > > > following
> > > > > > > > > comments:
> > > > > > > > > >>
> > > > > > > > > >> - minor: "limit the maximum size of partition" ==>
> "limit
> > > the
> > > > > > > maximum
> > > > > > > > > size
> > > > > > > > > >> of each partition"
> > > > > > > > > >> - "However, Samza currently is not able to handle
> > partition
> > > > > > > expansion
> > > > > > > > of
> > > > > > > > > >> the input streams"==>better point out "for stateful
> jobs".
> > > For
> > > > > > > > stateless
> > > > > > > > > >> jobs, simply bouncing the job now can pick up the new
> > > > > partitions.
> > > > > > > > > >> - "it is possible (e.g. with Kafka) that messages with a
> > > given
> > > > > key
> > > > > > > > > exists
> > > > > > > > > >> in both partition 1 an 3. Because GroupByPartition will
> > > assign
> > > > > > > > > partition 1
> > > > > > > > > >> and 3 to different tasks, messages with the same key may
> > be
> > > > > > handled
> > > > > > > by
> > > > > > > > > >> different task/container/process and their state will be
> > > > stored
> > > > > in
> > > > > > > > > >> different changelog partition." The problem statement is
> > not
> > > > > super
> > > > > > > > clear
> > > > > > > > > >> here. The issues with stateful jobs is: after
> > > GroupByPartition
> > > > > > > assign
> > > > > > > > > >> partition 1 and 3 to different tasks, the new task
> > handling
> > > > > > > partition
> > > > > > > > 3
> > > > > > > > > >> does not have the previous state to resume the work.
> e.g.
> > a
> > > > > > page-key
> > > > > > > > > based
> > > > > > > > > >> counter would start from 0 in the new task for a
> specific
> > > key,
> > > > > > > instead
> > > > > > > > > of
> > > > > > > > > >> resuming the previous count 50 held by task 1.
> > > > > > > > > >> - minor rewording: "the first solution in this doc" ==>
> > "the
> > > > > > > solution
> > > > > > > > > >> proposed in this doc"
> > > > > > > > > >> - "Thus additional development work is needed in Kafka
> to
> > > meet
> > > > > > this
> > > > > > > > > >> requirement" It would be good to link to a KIP if and
> when
> > > it
> > > > > > exists
> > > > > > > > > >> - Instead of touching/deprecating the interface
> > > > > > > > > >> SystemStreamPartitionGrouper, I would recommend to have
> a
> > > > > > different
> > > > > > > > > >> implementation class of the interface, which in the
> > > > constructor
> > > > > of
> > > > > > > the
> > > > > > > > > >> grouper, takes two parameters: a) the previous task
> number
> > > > read
> > > > > > from
> > > > > > > > the
> > > > > > > > > >> coordinator stream; b) the configured new-partition to
> > > > > > old-partition
> > > > > > > > > >> mapping policy. Then, the grouper's interface method
> stays
> > > the
> > > > > > same
> > > > > > > > and
> > > > > > > > > >> the
> > > > > > > > > >> behavior of the grouper is more configurable which is
> good
> > > to
> > > > > > > support
> > > > > > > > a
> > > > > > > > > >> broader set of use cases in addition to Kafka's built-in
> > > > > partition
> > > > > > > > > >> expansion policies.
> > > > > > > > > >> - Minor renaming suggestion to the new grouper class
> > names:
> > > > > > > > > >> GroupByPartitionWithFixedTaskNum
> > > > > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum
> > > > > > > > > >>
> > > > > > > > > >> Thanks!
> > > > > > > > > >>
> > > > > > > > > >> - Yi
> > > > > > > > > >>
> > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > > > wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hey Navina,
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks much for the comment. Please see my response
> > below.
> > > > > > > > > >> >
> > > > > > > > > >> > Regarding your biggest gripe with the SEP, I
> personally
> > > > think
> > > > > > the
> > > > > > > > > >> > operational requirement proposed in the KIP are pretty
> > > > general
> > > > > > and
> > > > > > > > > >> could be
> > > > > > > > > >> > easily enforced by other systems. The reason is that
> the
> > > > > module
> > > > > > > > > >> operation
> > > > > > > > > >> > is pretty standard and the default option when we
> choose
> > > > > > > partition.
> > > > > > > > > And
> > > > > > > > > >> > usually the underlying system allows user to select
> > > > arbitrary
> > > > > > > > > partition
> > > > > > > > > >> > number if it supports partition expansion. Do you know
> > any
> > > > > > system
> > > > > > > > that
> > > > > > > > > >> does
> > > > > > > > > >> > not meet these two requirement?
> > > > > > > > > >> >
> > > > > > > > > >> > Regarding your comment of the Motivation section, I
> > > renamed
> > > > > the
> > > > > > > > first
> > > > > > > > > >> > section as "Problem and Goal" and specified that "*The
> > > goal
> > > > of
> > > > > > > this
> > > > > > > > > >> > proposal is to enable partition expansion of the input
> > > > > > > streams*.". I
> > > > > > > > > >> also
> > > > > > > > > >> > put a sentence at the end of the Motivation section
> that
> > > > "*The
> > > > > > > > feature
> > > > > > > > > >> of
> > > > > > > > > >> > task expansion is out of the scope of this proposal
> and
> > > will
> > > > > be
> > > > > > > > > >> addressed
> > > > > > > > > >> > in a future SEP*". The second paragraph in the
> > Motivation
> > > > > > section
> > > > > > > is
> > > > > > > > > >> mainly
> > > > > > > > > >> > used to explain the thinking process that we have gone
> > > > > through,
> > > > > > > what
> > > > > > > > > >> other
> > > > > > > > > >> > alternative we have considered, and we plan to do in
> > Samza
> > > > in
> > > > > > the
> > > > > > > > nex
> > > > > > > > > >> step.
> > > > > > > > > >> >
> > > > > > > > > >> > To answer your question why increasing the partition
> > > number
> > > > > will
> > > > > > > > > >> increase
> > > > > > > > > >> > the throughput of the kafka consumer in the container,
> > > Kafka
> > > > > > > > consumer
> > > > > > > > > >> can
> > > > > > > > > >> > potentially fetch more data in one FetchResponse with
> > more
> > > > > > > > partitions
> > > > > > > > > in
> > > > > > > > > >> > the FetchRequest. This is because we limit the maximum
> > > > amount
> > > > > of
> > > > > > > > data
> > > > > > > > > >> that
> > > > > > > > > >> > can be fetch for a given partition in the
> FetchResponse.
> > > > This
> > > > > by
> > > > > > > > > >> default is
> > > > > > > > > >> > set to 1 MB. And there is reason that we can not
> > > arbitrarily
> > > > > > bump
> > > > > > > up
> > > > > > > > > >> this
> > > > > > > > > >> > limit.
> > > > > > > > > >> >
> > > > > > > > > >> > To answer your question how partition expansion in
> Kafka
> > > > > impacts
> > > > > > > the
> > > > > > > > > >> > clients, Kafka consumer is able to automatically
> detect
> > > new
> > > > > > > > partition
> > > > > > > > > of
> > > > > > > > > >> > the topic and reassign all (both old and new)
> partitions
> > > > > across
> > > > > > > > > >> consumers
> > > > > > > > > >> > in the consumer group IF you tell consumer the topic
> to
> > be
> > > > > > > > subscribed.
> > > > > > > > > >> But
> > > > > > > > > >> > consumer in Samza's container uses another way of
> > > > > subscription.
> > > > > > > > > Instead
> > > > > > > > > >> of
> > > > > > > > > >> > subscribing to the topic, the consumer in Samza's
> > > container
> > > > > > > > subscribes
> > > > > > > > > >> to
> > > > > > > > > >> > the specific partitions of the topic. In this case, if
> > new
> > > > > > > > partitions
> > > > > > > > > >> have
> > > > > > > > > >> > been added, Samza will need to explicitly subscribe to
> > the
> > > > new
> > > > > > > > > >> partitions
> > > > > > > > > >> > of the topic. The "Handle partition expansion while
> > tasks
> > > > are
> > > > > > > > running"
> > > > > > > > > >> > section in the SEP addresses this issue in Samza -- it
> > > > > > > recalculates
> > > > > > > > > the
> > > > > > > > > >> job
> > > > > > > > > >> > model and restart container so that consumer can
> > subscribe
> > > > to
> > > > > > the
> > > > > > > > new
> > > > > > > > > >> > partitions.
> > > > > > > > > >> >
> > > > > > > > > >> > I will ask other dev to take a look at the proposal. I
> > > will
> > > > > > start
> > > > > > > > the
> > > > > > > > > >> > voting thread tomorrow if there is no further concern
> > with
> > > > the
> > > > > > > SEP.
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks!
> > > > > > > > > >> > Dong
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh
> > (Apache) <
> > > > > > > > > >> > nav...@apache.org>
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Hey Dong,
> > > > > > > > > >> > >
> > > > > > > > > >> > > >  I have updated the motivation section to clarify
> > > this.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Thanks for updating the motivation. Couple of notes
> > > here:
> > > > > > > > > >> > >
> > > > > > > > > >> > > 1.
> > > > > > > > > >> > > > "The motivation of increasing partition number of
> > > Kafka
> > > > > > topic
> > > > > > > > > >> includes
> > > > > > > > > >> > 1)
> > > > > > > > > >> > > limit the maximum size of a partition in order to
> > > improve
> > > > > > broker
> > > > > > > > > >> > > performance and 2) increase throughput of Kafka
> > consumer
> > > > in
> > > > > > the
> > > > > > > > > Samza
> > > > > > > > > >> > > container."
> > > > > > > > > >> > >
> > > > > > > > > >> > > It's unclear to me how increasing the partition
> number
> > > > will
> > > > > > > > increase
> > > > > > > > > >> the
> > > > > > > > > >> > > throughput of the kafka consumer in the container?
> > > > > > > Theoretically,
> > > > > > > > > you
> > > > > > > > > >> > will
> > > > > > > > > >> > > still be consuming the same amount of data in the
> > > > container,
> > > > > > > > > >> irrespective
> > > > > > > > > >> > > of whether it is coming from one partition or more
> > than
> > > > one
> > > > > > > > expanded
> > > > > > > > > >> > > partitions. Can you please explain it for me here,
> > what
> > > > you
> > > > > > mean
> > > > > > > > by
> > > > > > > > > >> that?
> > > > > > > > > >> > >
> > > > > > > > > >> > > 2. I believe the second paragraph under motivation
> is
> > > > simply
> > > > > > > > talking
> > > > > > > > > >> > about
> > > > > > > > > >> > > the scope of the current SEP. It will be easier to
> > read
> > > if
> > > > > > what
> > > > > > > > > >> solution
> > > > > > > > > >> > is
> > > > > > > > > >> > > included in this SEP and what is left out as not in
> > > scope.
> > > > > > (for
> > > > > > > > > >> example,
> > > > > > > > > >> > > expansions for stateful jobs is supported or not).
> > > > > > > > > >> > >
> > > > > > > > > >> > > > We need to persist the task-to-sspList mapping in
> > the
> > > > > > > > > >> > > coordinator stream so that the job can derive the
> > > original
> > > > > > > number
> > > > > > > > of
> > > > > > > > > >> > > partitions of each input stream regardless of how
> many
> > > > times
> > > > > > the
> > > > > > > > > >> > partition
> > > > > > > > > >> > > has expanded. Does this make sense?
> > > > > > > > > >> > >
> > > > > > > > > >> > > Yes. It does!
> > > > > > > > > >> > >
> > > > > > > > > >> > > > I am not sure how this is related to the locality
> > > > though.
> > > > > > Can
> > > > > > > > you
> > > > > > > > > >> > clarify
> > > > > > > > > >> > > your question if I haven't answered your question?
> > > > > > > > > >> > >
> > > > > > > > > >> > > It's not related. I just meant to give an example of
> > yet
> > > > > > another
> > > > > > > > > >> > > coordinator message that is persisted. Your
> > ssp-to-task
> > > > > > mapping
> > > > > > > is
> > > > > > > > > >> > > following a similar pattern for persisting. Just
> > wanted
> > > to
> > > > > > > clarify
> > > > > > > > > >> that.
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Can you let me know if this, together with the
> > answers
> > > > in
> > > > > > the
> > > > > > > > > >> previous
> > > > > > > > > >> > > email, addresses all your questions?
> > > > > > > > > >> > >
> > > > > > > > > >> > > Yes. I believe you have addressed most of my
> > questions.
> > > > > Thanks
> > > > > > > for
> > > > > > > > > >> taking
> > > > > > > > > >> > > time to do that.
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Is there specific question you have regarding
> > > partition
> > > > > > > > > >> > > expansion in Kafka?
> > > > > > > > > >> > >
> > > > > > > > > >> > > I guess my questions are on how partition expansion
> in
> > > > Kafka
> > > > > > > > impacts
> > > > > > > > > >> the
> > > > > > > > > >> > > clients. Iiuc, partition expansions are done
> manually
> > in
> > > > > Kafka
> > > > > > > > based
> > > > > > > > > >> on
> > > > > > > > > >> > the
> > > > > > > > > >> > > bytes-in rate of the partition. Do the existing
> kafka
> > > > > clients
> > > > > > > > handle
> > > > > > > > > >> this
> > > > > > > > > >> > > expansion automatically? if yes, how does it work?
> If
> > > not,
> > > > > are
> > > > > > > > there
> > > > > > > > > >> > plans
> > > > > > > > > >> > > to support it in the future?
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Thus user's job should not need to bootstrap
> > key/value
> > > > > store
> > > > > > > > from
> > > > > > > > > >> the
> > > > > > > > > >> > > changelog topic.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Why is this discussion relevant here? Key/value
> store
> > /
> > > > > > > changelog
> > > > > > > > > >> topic
> > > > > > > > > >> > > partition is scoped with the context of a task.
> Since
> > we
> > > > are
> > > > > > not
> > > > > > > > > >> changing
> > > > > > > > > >> > > the number of tasks, I don't think it is required to
> > > > mention
> > > > > > it
> > > > > > > > > here.
> > > > > > > > > >> > >
> > > > > > > > > >> > > > The new method takes the
> > SystemStreamPartition-to-Task
> > > > > > > > assignment
> > > > > > > > > >> from
> > > > > > > > > >> > > the previous job model which can be read from the
> > > > > coordinator
> > > > > > > > > stream.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Jobmodel is currently not persisted to coordinator
> > > stream.
> > > > > In
> > > > > > > your
> > > > > > > > > >> > design,
> > > > > > > > > >> > > you talk about writing separate coordinator messages
> > for
> > > > > > > > ssp-to-task
> > > > > > > > > >> > > assignments. Hence, please correct this statement.
> It
> > is
> > > > > kind
> > > > > > of
> > > > > > > > > >> > misleading
> > > > > > > > > >> > > to the reader.
> > > > > > > > > >> > >
> > > > > > > > > >> > > My biggest gripe with this SEP is that it seems
> like a
> > > > > > > tailor-made
> > > > > > > > > >> > solution
> > > > > > > > > >> > > that relies on the semantics of the Kafka system and
> > > yet,
> > > > we
> > > > > > are
> > > > > > > > > >> trying
> > > > > > > > > >> > to
> > > > > > > > > >> > > masquerade that as operational requirements for
> other
> > > > > systems
> > > > > > > > > >> interacting
> > > > > > > > > >> > > with Samza. (Not to say that this is the first time
> > > such a
> > > > > > > choice
> > > > > > > > is
> > > > > > > > > >> > being
> > > > > > > > > >> > > made in the Samza design). I am not seeing how this
> > can
> > > a
> > > > > > > > "general"
> > > > > > > > > >> > > solution for all input systems. That's my two
> cents. I
> > > > would
> > > > > > > like
> > > > > > > > to
> > > > > > > > > >> hear
> > > > > > > > > >> > > alternative points of view for this from other devs.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Please make sure you have enough eyes on this SEP.
> If
> > > you
> > > > > do,
> > > > > > > > please
> > > > > > > > > >> > start
> > > > > > > > > >> > > a VOTE thread to approve this SEP.
> > > > > > > > > >> > >
> > > > > > > > > >> > > Thanks!
> > > > > > > > > >> > > Navina
> > > > > > > > > >> > >
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <
> > > > > > lindon...@gmail.com
> > > > > > > >
> > > > > > > > > >> wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > Hey Navina,
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > I have updated the wiki based on your suggestion.
> > More
> > > > > > > > > >> specifically, I
> > > > > > > > > >> > > have
> > > > > > > > > >> > > > made the following changes:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > - Improved Problem section and Motivation section
> to
> > > > > > describe
> > > > > > > > why
> > > > > > > > > we
> > > > > > > > > >> > use
> > > > > > > > > >> > > > the solution in this proposal instead of tackling
> > the
> > > > > > problem
> > > > > > > of
> > > > > > > > > >> task
> > > > > > > > > >> > > > expansion directly.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > - Illustrate the design in a way that doesn't bind
> > to
> > > > > Kafka.
> > > > > > > > Kafka
> > > > > > > > > >> is
> > > > > > > > > >> > > only
> > > > > > > > > >> > > > used as example to illustrate why we want to
> expand
> > > > > > partition
> > > > > > > > > >> expansion
> > > > > > > > > >> > > and
> > > > > > > > > >> > > > whether the operational requirement can be
> supported
> > > > when
> > > > > > > Kafka
> > > > > > > > is
> > > > > > > > > >> used
> > > > > > > > > >> > > as
> > > > > > > > > >> > > > the input system. Note that the proposed solution
> > > should
> > > > > > work
> > > > > > > > for
> > > > > > > > > >> any
> > > > > > > > > >> > > input
> > > > > > > > > >> > > > system that meets the operational requirement
> > > described
> > > > in
> > > > > > the
> > > > > > > > > wiki.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > - Fixed the problem in the figure.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > - Added a new class GroupBySystemStreamPartitionFi
> > > > > > xedTaskNum
> > > > > > > to
> > > > > > > > > the
> > > > > > > > > >> > > wiki.
> > > > > > > > > >> > > > Together with GroupByPartitionFixedTaskNum, it
> > should
> > > > > ensure
> > > > > > > > that
> > > > > > > > > we
> > > > > > > > > >> > > have a
> > > > > > > > > >> > > > solution to enable partition expansion for all
> users
> > > > that
> > > > > > are
> > > > > > > > > using
> > > > > > > > > >> > > > pre-defined grouper in Samza. Note that those
> users
> > > who
> > > > > use
> > > > > > > > custom
> > > > > > > > > >> > > grouper
> > > > > > > > > >> > > > would need to update their implementation.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Can you let me know if this, together with the
> > answers
> > > > in
> > > > > > the
> > > > > > > > > >> previous
> > > > > > > > > >> > > > email, addresses all your questions? Thanks for
> > taking
> > > > > time
> > > > > > to
> > > > > > > > > >> review
> > > > > > > > > >> > the
> > > > > > > > > >> > > > proposal.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Regards,
> > > > > > > > > >> > > > Dong
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
> > > > > > > lindon...@gmail.com
> > > > > > > > >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > Hey Navina,
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Thanks much for your comments. Please see my
> reply
> > > > > inline.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh
> > > > > (Apache) <
> > > > > > > > > >> > > > > nav...@apache.org> wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of
> > > > questions
> > > > > to
> > > > > > > > > >> understand
> > > > > > > > > >> > > > your
> > > > > > > > > >> > > > >> proposal better:
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> * Under motivation, you mention that "_We
> expect
> > > this
> > > > > > > > solution
> > > > > > > > > to
> > > > > > > > > >> > work
> > > > > > > > > >> > > > >> similarly with other input system as well._",
> > yet I
> > > > > don't
> > > > > > > see
> > > > > > > > > any
> > > > > > > > > >> > > > >> discussion on how it will work with other input
> > > > > systems.
> > > > > > > That
> > > > > > > > > is,
> > > > > > > > > >> > what
> > > > > > > > > >> > > > >> kind
> > > > > > > > > >> > > > >> of contract does samza expect from other input
> > > > systems
> > > > > ?
> > > > > > If
> > > > > > > > we
> > > > > > > > > >> are
> > > > > > > > > >> > not
> > > > > > > > > >> > > > >> planning to provide a generic solution, it
> might
> > be
> > > > > worth
> > > > > > > > > >> calling it
> > > > > > > > > >> > > out
> > > > > > > > > >> > > > >> in
> > > > > > > > > >> > > > >> the SEP.
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > I think the contract we expect from other
> systems
> > > are
> > > > > > > exactly
> > > > > > > > > the
> > > > > > > > > >> > > > > operational requirement mentioned in the SEP,
> i.e.
> > > > > > > partitions
> > > > > > > > > >> should
> > > > > > > > > >> > > > always
> > > > > > > > > >> > > > > be doubled and the hash algorithm should module
> > the
> > > > > number
> > > > > > > of
> > > > > > > > > >> > > partitions.
> > > > > > > > > >> > > > > SEP-5 should also allow partition expansion of
> all
> > > > input
> > > > > > > > systems
> > > > > > > > > >> that
> > > > > > > > > >> > > > meet
> > > > > > > > > >> > > > > these two requirements. I have updated the
> > > motivation
> > > > > > > section
> > > > > > > > to
> > > > > > > > > >> > > clarify
> > > > > > > > > >> > > > > this.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> * I understand the partition mapping logic you
> > have
> > > > > > > proposed.
> > > > > > > > > >> But I
> > > > > > > > > >> > > > think
> > > > > > > > > >> > > > >> the example explanation doesn't match the
> > diagram.
> > > In
> > > > > the
> > > > > > > > > >> diagram,
> > > > > > > > > >> > > after
> > > > > > > > > >> > > > >> expansion, partiion-0 and partition-1 are
> > pointing
> > > to
> > > > > > > bucket
> > > > > > > > 0
> > > > > > > > > >> and
> > > > > > > > > >> > > > >> partition-3 and partition-4 are pointing to
> > bucket
> > > > 1. I
> > > > > > > think
> > > > > > > > > the
> > > > > > > > > >> > > former
> > > > > > > > > >> > > > >> has to be partition-0 and partition-2 and the
> > > latter,
> > > > > is
> > > > > > > > > >> partition-1
> > > > > > > > > >> > > and
> > > > > > > > > >> > > > >> partition-3. If I am wrong, please help me
> > > understand
> > > > > the
> > > > > > > > logic
> > > > > > > > > >> :)
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Good catch. I will update the figure to fix this
> > > > > problem.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> * I don't know how partition expansion in Kafka
> > > > works.
> > > > > I
> > > > > > am
> > > > > > > > > >> familiar
> > > > > > > > > >> > > > with
> > > > > > > > > >> > > > >> how shard splitting happens in Kinesis - there
> is
> > > > > > > > hierarchical
> > > > > > > > > >> > > relation
> > > > > > > > > >> > > > >> between the parent and child shards. This way,
> it
> > > > will
> > > > > > also
> > > > > > > > > allow
> > > > > > > > > >> > the
> > > > > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only
> > supports
> > > > > > > partition
> > > > > > > > > >> > > > "expansion",
> > > > > > > > > >> > > > >> as opposed to "splits". Can you provide some
> > > context
> > > > or
> > > > > > > link
> > > > > > > > > >> related
> > > > > > > > > >> > > to
> > > > > > > > > >> > > > >> how
> > > > > > > > > >> > > > >> partition expansion works in Kafka?
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > I couldn't find any wiki on partition expansion
> in
> > > > > Kafka.
> > > > > > > The
> > > > > > > > > >> > partition
> > > > > > > > > >> > > > > expansion logic in Kafka is very simply -- it
> > simply
> > > > > adds
> > > > > > > new
> > > > > > > > > >> > partition
> > > > > > > > > >> > > > to
> > > > > > > > > >> > > > > the existing topic. Is there specific question
> you
> > > > have
> > > > > > > > > regarding
> > > > > > > > > >> > > > partition
> > > > > > > > > >> > > > > expansion in Kafka?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> * Are you only recommending that expansion can
> be
> > > > > > supported
> > > > > > > > for
> > > > > > > > > >> > samza
> > > > > > > > > >> > > > jobs
> > > > > > > > > >> > > > >> that use Kafka as input systems **and**
> configure
> > > the
> > > > > > > > > SSPGrouper
> > > > > > > > > >> as
> > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like
> > > this
> > > > > only
> > > > > > > > > applies
> > > > > > > > > >> > for
> > > > > > > > > >> > > > >> GroupByPartition. Please correct me if I am
> > wrong.
> > > > What
> > > > > > is
> > > > > > > > the
> > > > > > > > > >> > > > expectation
> > > > > > > > > >> > > > >> for custom SSP Groupers?
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > The expansion can be supported for Samza jobs if
> > the
> > > > > input
> > > > > > > > > system
> > > > > > > > > >> > meets
> > > > > > > > > >> > > > > the operational requirement mentioned above. It
> > > > doesn't
> > > > > > have
> > > > > > > > to
> > > > > > > > > >> use
> > > > > > > > > >> > > Kafka
> > > > > > > > > >> > > > > as input system.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > The current proposal provided solution for jobs
> > that
> > > > > > > currently
> > > > > > > > > use
> > > > > > > > > >> > > > > GroupByPartition. The proposal can be extended
> to
> > > > > support
> > > > > > > jobs
> > > > > > > > > >> that
> > > > > > > > > >> > use
> > > > > > > > > >> > > > > other grouper that are pre-defined in Samza. The
> > > > custom
> > > > > > SSP
> > > > > > > > > >> grouper
> > > > > > > > > >> > > needs
> > > > > > > > > >> > > > > to handle partition expansion similar to how
> > > > > > > > > >> > > GroupByPartitionFixedTaskNum
> > > > > > > > > >> > > > > handles it and it is users' responsibility to
> > update
> > > > > their
> > > > > > > > > custom
> > > > > > > > > >> > > grouper
> > > > > > > > > >> > > > > implementation.
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to
> > > > > coordinator
> > > > > > > > > stream:
> > > > > > > > > >> > > Today,
> > > > > > > > > >> > > > >> the JobModel encapsulates the data model in
> samza
> > > > which
> > > > > > > also
> > > > > > > > > >> > includes
> > > > > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows the
> > > > > > > > task-to-sspList
> > > > > > > > > >> > > mapping.
> > > > > > > > > >> > > > >> What is the reason for using a separate
> > coordinator
> > > > > > stream
> > > > > > > > > >> message
> > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel
> > > > itself
> > > > > is
> > > > > > > not
> > > > > > > > > >> > > persisted
> > > > > > > > > >> > > > in
> > > > > > > > > >> > > > >> the coordinator stream today?  The reason
> > locality
> > > > > exists
> > > > > > > > > >> outside of
> > > > > > > > > >> > > the
> > > > > > > > > >> > > > >> jobmodel is because *locality* information is
> > > written
> > > > > by
> > > > > > > each
> > > > > > > > > >> > > container,
> > > > > > > > > >> > > > >> where as it is consumed only by the leader
> > > > > > > jobcoordinator/AM.
> > > > > > > > > In
> > > > > > > > > >> > this
> > > > > > > > > >> > > > >> case,
> > > > > > > > > >> > > > >> the writer of the mapping information and the
> > > reader
> > > > is
> > > > > > > still
> > > > > > > > > the
> > > > > > > > > >> > > leader
> > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to understand the
> > > > > > motivation
> > > > > > > > for
> > > > > > > > > >> this
> > > > > > > > > >> > > > >> choice.
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Yes, the reason for using a separate coordinate
> > > stream
> > > > > > > message
> > > > > > > > > is
> > > > > > > > > >> > > because
> > > > > > > > > >> > > > > the task-to-sspList mapping is not currently
> > > persisted
> > > > > in
> > > > > > > the
> > > > > > > > > >> > > coordinator
> > > > > > > > > >> > > > > stream. We wouldn't need to create this new
> stream
> > > > > message
> > > > > > > if
> > > > > > > > > >> > JobModel
> > > > > > > > > >> > > is
> > > > > > > > > >> > > > > persisted. We need to persist the
> task-to-sspList
> > > > > mapping
> > > > > > in
> > > > > > > > the
> > > > > > > > > >> > > > > coordinator stream so that the job can derive
> the
> > > > > original
> > > > > > > > > number
> > > > > > > > > >> of
> > > > > > > > > >> > > > > partitions of each input stream regardless of
> how
> > > many
> > > > > > times
> > > > > > > > the
> > > > > > > > > >> > > > partition
> > > > > > > > > >> > > > > has expanded. Does this make sense?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > I am not sure how this is related to the
> locality
> > > > > though.
> > > > > > > Can
> > > > > > > > > you
> > > > > > > > > >> > > clarify
> > > > > > > > > >> > > > > your question if I haven't answered your
> question?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > Thanks!
> > > > > > > > > >> > > > > Dong
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> Cheers!
> > > > > > > > > >> > > > >> Navina
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <
> > > > > > > > lindon...@gmail.com
> > > > > > > > > >
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >> > Hi all,
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > We created SEP-5: Enable partition expansion
> of
> > > > input
> > > > > > > > > streams.
> > > > > > > > > >> > > Please
> > > > > > > > > >> > > > >> find
> > > > > > > > > >> > > > >> > the SEP wiki in the link
> > > > > > > > > >> > > > >> > https://cwiki.apache.org/
> > > > > confluence/display/SAMZA/SEP-
> > > > > > > > > >> > > > >> > 5%3A+Enable+partition+
> > expansion+of+input+streams
> > > > > > > > > >> > > > >> > .
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > You feedback is appreciated!
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >> > Thanks,
> > > > > > > > > >> > > > >> > Dong
> > > > > > > > > >> > > > >> >
> > > > > > > > > >> > > > >>
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to