BTW, I will update the SEP-5 wiki with our latest discussion after I have
got the wiki edit access.

On Sat, Jun 17, 2017 at 11:36 PM, Dong Lin <lindon...@gmail.com> wrote:

> Thanks everyone for the comment!
>
> I am currently leaning towards the current approach. I think Kartik raised
> a good point that the extra repartitoning stage will also incur additional
> throughput on Kafka in addition to the potential storage cost. Any other
> Samza developers also chime in and provide your opinions on this proposal?
>
> Since this discussion thread has been open for three weeks, I will
> initiate voting thread on Monday if there is no major revision suggestion.
>
> Thanks,
> Dong
>
>
> On Thu, Jun 15, 2017 at 6:32 PM, Kartik Paramasivam <
> kparamasi...@linkedin.com.invalid> wrote:
>
>> Great discussion !
>>
>> Here are some more thoughts
>>
>> The point that repartitioning is a more general purpose solution is surely
>> spot on.  For many source systems (Kinesis, Google Pub-Sub, any of the
>> older queuing systems (rabbitMQ etc. etc.), repartitioning is anyways
>> functionally required to do even simple keyed aggregations.   But in most
>> of these systems, the concept of repartitioning either does not exist or
>> exists in a way which is very unique (e.g. Kinesis).
>>
>> I think this feature is really only interesting for source systems like
>> Kafka and EventHub.  EventHub (last I checked) didn't support
>> repartitioning. So this is probably not super-interesting (yet) for
>> EventHub.
>>
>> So Kafka is clearly the main use case here.
>>
>> For Kafka, I think it is pretty rare for people to customize the hashing
>> algorithm for sending messages.  I would argue that less than 5% of the
>> population (i am being generous ;)) would do that.   The current proposal
>> works with the default hashing scheme for Kafka.  So organizations will
>> typically never have to coordinate.
>>
>> If the proposed alternative (always repartition) was side-effect free,
>> then
>> it would make sense to use an alternative design that would work for 100%
>> of the population.    Repartitioning all input would however not be a
>> feasible solution (atleast at LinkedIn) as it would double the kafka
>> workload.    If many samza jobs read from kafka topics, then the increase
>> would be a function of the number of samza jobs.
>>
>> For low throughput kafka topics, surely explicit repartitioning using
>> fluent api is feasible.
>>
>> If the proposal was to make this new policy the default then that would
>> clearly not make much sense.
>>
>> But it is an opt in policy.  If it is not applicable, people don't have to
>> use it.
>>
>> I do have some questions about the implementation. I will try to respond
>> back after spending some more time on this.
>>
>>
>>
>> On Thu, Jun 15, 2017 at 7:53 AM, Jacob Maes <jacob.m...@gmail.com> wrote:
>>
>> > Thanks, Dong.
>> >
>> > The summary looks accurate.
>> >
>> > I'll let the others chime in, as I believe my perspective has been
>> > adequately captured in this thread.
>> >
>> > -Jake
>> >
>> > On Wed, Jun 14, 2017 at 12:12 PM, Dong Lin <lindon...@gmail.com> wrote:
>> >
>> > > Hey Jacob,
>> > >
>> > > Thank you for taking so much time to discuss with me! I appreciate the
>> > > discussion and the insight. I will summarize our discussion below.
>> > >
>> > > 1) Whether it is reasonable to store partition-to-task mapping.
>> > >
>> > > We agree that this partition-to-task mapping will be reasonable if we
>> > allow
>> > > user to specify either the new-partition-to-old-partition mapping or
>> > > key-to-partition mapping in the future. SEP-5 doesn't currently
>> provide a
>> > > way for user to specify new-partition-to-old partition mapping
>> because we
>> > > don't have a good idea about that interface until we try to enable
>> > > partition expansion for input system other than Kafka in the future.
>> This
>> > > is currently specified as the third future work in SEP-5.
>> > >
>> > > And if we decide to implement SEP-5, I will include a warning message
>> > > regarding the use of partition-to-task, i.e. "this does not specify
>> the
>> > > key-to-task mapping". We agree that this could address the concern
>> here.
>> > >
>> > > 2) Whether we should follow the approach in SEP-5 or use an extra
>> > > re-partitioning stage in the stateful Samza job to enable partition
>> > > expansion.
>> > >
>> > > Here are the pros and cons of the extra re-partitioning stage in
>> > comparison
>> > > to SEP-5.
>> > >
>> > > Pros:
>> > > - It doesn't require owner of the Samza job to know the partitioning
>> > > algorithm of used for the input stream. If the owner of the Samza job
>> is
>> > in
>> > > a different organization than the producer of the input stream, this
>> > > solution frees different organizations from having to coordinate with
>> > each
>> > > other.
>> > > - It doesn't require owner of the Samza job to specify the
>> partitioning
>> > > algorithm of used for the input stream. Thus less config.
>> > >
>> > > Cons:
>> > > - User has to make code change on their side to use the new fluent
>> API.
>> > > - The extra partitioning stage would potentially increases latency.
>> > > - The extra partitioning stage would incur additional cost due to the
>> > extra
>> > > internal topic. The cost is probably not that much with the new trim()
>> > API
>> > > in Kafka if Samza uses Kafka to store the internal topic. But the cost
>> > may
>> > > be doubled if Samza uses another input system that doesn't provide
>> trim()
>> > > API to delete data on demand.
>> > >
>> > > My recommendation is to adopt a hybrid solution, i.e. we still
>> implement
>> > > the current proposal in SEP-5 so that we enable partition expansion
>> > without
>> > > incurring extra latency/cost and without requiring users to change
>> their
>> > > code. And we can recommend user to use the extra partitioning stage if
>> > the
>> > > coordination among different organization is indeed a concern.
>> > >
>> > > Can other developers also provide feedback regarding your preference
>> > > between the two?
>> > >
>> > > Thanks,
>> > > Dong
>> > >
>> > >
>> > >
>> > >
>> > > On Tue, Jun 13, 2017 at 9:30 AM, Jacob Maes <jacob.m...@gmail.com>
>> > wrote:
>> > >
>> > > > Hey Dong,
>> > > >
>> > > > I appreciate your thoughtful responses. Let's do one more round :-)
>> > > >
>> > > >
>> > > > > Here are my current concern with the three alternatives you
>> described
>> > > > > earlier:
>> > > > > - The first alternative requires support from input system which
>> is
>> > > > > currently not available. It will limit the usage of partition
>> > expansion
>> > > > to
>> > > > > only systems that support such interface. And it is not guaranteed
>> > that
>> > > > we
>> > > > > can persuade the developer of the input system to add this
>> interface.
>> > > > This
>> > > > > is not desirable for Samza in the long term.
>> > > >
>> > > > Agreed. It is very wishful thinking that each supported system would
>> > > > provide such a contract.
>> > > >
>> > > >
>> > > > >
>> > > > > - I can not comment on the second alternative because I don't
>> > > understand
>> > > > > how it reshuffles all existing changelog data. We can discuss
>> more if
>> > > > there
>> > > > > is more specific detail. My gut feel is that this will be complex
>> and
>> > > > > carries performance overhead.
>> > > >
>> > > > After giving this more thought, I agree. There is no clear way to
>> > > migrate a
>> > > > changelog without knowing the original key->partition mapping. Which
>> > > leads
>> > > > us to alternative 3...
>> > > >
>> > > >
>> > > > >
>> > > > > - The third alternative requires performance overhead. Given that
>> > user
>> > > > can
>> > > > > already use this solution to enable partition expansion, maybe
>> Samza
>> > > > > developers can provide more input as to why we are not doing it by
>> > > > default.
>> > > > > My gut feel is that it carries considerable performance overhead
>> and
>> > > > > increases the cost-to-serve Samze job (e.g. disk usage), which may
>> > make
>> > > > it
>> > > > > undesirable in the long term.
>> > > >
>> > > > I think the only performance overhead would be the mandatory
>> > > repartitioning
>> > > > stage for stateful jobs. But a repartitioner is usually much faster
>> > than
>> > > > the downstream stateful job, so it only seems a cost to serve issue.
>> > > >
>> > > > As for why we aren't already doing this, I would posit that before
>> the
>> > > > introduction of the high level API, which trivializes
>> repartitioning,
>> > it
>> > > > was unreasonable to expect each job owner to do the mandatory
>> > > > repartitioning. With the high level API, I would argue this is much
>> > more
>> > > > doable.
>> > > >
>> > > >
>> > > > I am not sure it is true that "any future feature that utilizes this
>> > > > > mapping without accounting for the assumptions of this SEP is
>> likely
>> > to
>> > > > > malfunction". Suppose we allow user to specify
>> new-to-old-partition
>> > > > > mapping, then we can use the partition-to-task mapping correctly
>> > > without
>> > > > > replying on the assumption in this SEP, right?
>> > > >
>> > > > Right, but my point was that the partition->task mapping is not
>> > > sufficient
>> > > > by itself. So adding it by itself is potentially misleading.
>> > > >
>> > > > On Mon, Jun 12, 2017 at 8:34 PM, Dong Lin <lindon...@gmail.com>
>> wrote:
>> > > >
>> > > > > Thanks for the reply Jacob. Please see my comment inline.
>> > > > >
>> > > > > On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > > > >
>> > > > > > > - For users that need partition expansion of the input streams
>> > for
>> > > > > > stateful
>> > > > > > > job, they have a really big headache in the sense that Samza
>> does
>> > > not
>> > > > > > allow
>> > > > > > > partition expansion for stateful job. SEP-5 addresses this
>> > headache
>> > > > for
>> > > > > > > them.
>> > > > > > > You are right that SEP-5 requires user to understand and
>> enforce
>> > > > > > > limitations across organizations. But it is still much better
>> > than
>> > > > not
>> > > > > > > allowing user to expansion partition for stateful jobs at all,
>> > > right?
>> > > > > > Did I
>> > > > > > > miss something here?
>> > > > > >
>> > > > > > I guess this one is a matter of perspective.
>> > > > > >
>> > > > > > One argument is that if the system supports one case, it's
>> better
>> > > than
>> > > > > none
>> > > > > > because there is one less scenario in which the system does the
>> > wrong
>> > > > > > thing.
>> > > > > >
>> > > > > > The counter argument is for uniform and consistent behavior,
>> which
>> > is
>> > > > > easy
>> > > > > > for users to understand and properly leverage.
>> > > > > >
>> > > > > > Specifically, I'd argue that the current rule is very simple:
>> "you
>> > > > cannot
>> > > > > > repartition inputs on a stateful job, so you must over-partition
>> > the
>> > > > > > initial implementation". To me, while that rule is not ideal,
>> its
>> > > > > > simplicity is better that introducing a new solution that has a
>> > bunch
>> > > > of
>> > > > > > caveats, any one of which could be missed. If any one of the
>> > > > assumptions
>> > > > > in
>> > > > > > this SEP design are violated, the job would behave incorrectly.
>> > That
>> > > > > puts a
>> > > > > > lot more burden on the users than the simpler rule.
>> > > > > >
>> > > > >
>> > > > > I agree that we have different perspective here. It is true that
>> user
>> > > > would
>> > > > > mess up their job if they used this feature in a wrong way, i.e.
>> > > violate
>> > > > > the assumption made in SEP-5. On the other hand, I think there is
>> > > always
>> > > > a
>> > > > > way for user to mess up their job if they configure the Samza job
>> > > > > incorrectly. I also think the assumption made in this SEP is not
>> > > > > particularly harder to understand than other existing configs in
>> > Samza.
>> > > > >
>> > > > > The answer to this can be subjective. I would love to hear
>> > perspective
>> > > > from
>> > > > > other developers on this issue.
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > That's why I mentioned a few alternatives that, while more
>> complex
>> > to
>> > > > > > implement, would provide a more consistent behavior with simple
>> > rules
>> > > > for
>> > > > > > the users.
>> > > > > >
>> > > > >
>> > > > > I am open to discuss alternative solutions that can address the
>> the
>> > > > problem
>> > > > > in a better manner. I am not opposed to complexity as long as it
>> > gives
>> > > us
>> > > > > good long term benefits.
>> > > > >
>> > > > > Here are my current concern with the three alternatives you
>> described
>> > > > > earlier:
>> > > > >
>> > > > > - The first alternative requires support from input system which
>> is
>> > > > > currently not available. It will limit the usage of partition
>> > expansion
>> > > > to
>> > > > > only systems that support such interface. And it is not guaranteed
>> > that
>> > > > we
>> > > > > can persuade the developer of the input system to add this
>> interface.
>> > > > This
>> > > > > is not desirable for Samza in the long term.
>> > > > >
>> > > > > - I can not comment on the second alternative because I don't
>> > > understand
>> > > > > how it reshuffles all existing changelog data. We can discuss
>> more if
>> > > > there
>> > > > > is more specific detail. My gut feel is that this will be complex
>> and
>> > > > > carries performance overhead.
>> > > > >
>> > > > > - The third alternative requires performance overhead. Given that
>> > user
>> > > > can
>> > > > > already use this solution to enable partition expansion, maybe
>> Samza
>> > > > > developers can provide more input as to why we are not doing it by
>> > > > default.
>> > > > > My gut feel is that it carries considerable performance overhead
>> and
>> > > > > increases the cost-to-serve Samze job (e.g. disk usage), which may
>> > make
>> > > > it
>> > > > > undesirable in the long term.
>> > > > >
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > Yes, we need a similar check for GroupBySystemStreamPartitionWi
>> > > > > > > thFixedTaskNum
>> > > > > > > as well. If there is more grouper classes needed in the
>> future,
>> > we
>> > > > can
>> > > > > > > solve this problem cleanly without new config. Given the
>> > > > > > > previousGrouperClass and newGrouperClass,
>> KafkaCheckpointLogKey
>> > > will
>> > > > > > throw
>> > > > > > > exception if and only if newGrouperClass is an instance of
>> > > > > > > previousGrouperClass.
>> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend
>> > > > > > > GroupBySystemStreamPartition
>> > > > > > > and GroupByPartitionWithFixedTaskNum should extend
>> > > GroupByPartition.
>> > > > > > Does
>> > > > > > > this address your concern?
>> > > > > >
>> > > > > > Sounds workable, thanks.
>> > > > > >
>> > > > > > >
>> > > > > > > Can
>> > > > > > > you be more specific why Partition-to-task mapping is not
>> > > meaningful
>> > > > > > > without
>> > > > > > > some definition of the key-to-partition assignments and why
>> it is
>> > > > > > > incomplete and misleading?
>> > > > > >
>> > > > > >  A partition is (in my naive interpretation) an independent
>> queue
>> > for
>> > > > > > messages of a particular key set. It is not the *identity* of
>> the
>> > > > > partition
>> > > > > > that determine the contents of the associated task's local
>> state.
>> > > > Rather
>> > > > > it
>> > > > > > is the *contents* of the partition that affect the task's
>> state. A
>> > > > > > partiton-to-task mapping only captures an identity relationship:
>> > > > > > partition1->task1. Without the assumptions of this SEP, this is
>> > > > > > insufficient to determine the assignment of keys to tasks,
>> which is
>> > > > what
>> > > > > > really matters. Therefore, any future feature that utilizes this
>> > > > mapping
>> > > > > > without accounting for the assumptions of this SEP is likely to
>> > > > > > malfunction.
>> > > > > >
>> > > > > >
>> > > > > I am not sure it is true that "any future feature that utilizes
>> this
>> > > > > mapping without accounting for the assumptions of this SEP is
>> likely
>> > to
>> > > > > malfunction". Suppose we allow user to specify
>> new-to-old-partition
>> > > > > mapping, then we can use the partition-to-task mapping correctly
>> > > without
>> > > > > replying on the assumption in this SEP, right?
>> > > > >
>> > > > >
>> > > > > >
>> > > > > > On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com>
>> > > wrote:
>> > > > > >
>> > > > > > > Hey Jacob,
>> > > > > > >
>> > > > > > > Thanks for the explanation. It seems that your biggest
>> concern is
>> > > > with
>> > > > > > the
>> > > > > > > generality of the proposal. Let me try to address this and
>> other
>> > > > > comments
>> > > > > > > below.
>> > > > > > >
>> > > > > > > 1) ... it will cause headaches for Samza users ...
>> > > > > > >
>> > > > > > > I am not sure I understand why this proposal causes headache
>> for
>> > > > Samza
>> > > > > > > users. Here is the impact of the SEP-5 on users:
>> > > > > > >
>> > > > > > > - For users that do not need partition expansion of the input
>> > > stream,
>> > > > > > they
>> > > > > > > can use Samza without change change in code/binary/config.
>> Thus
>> > > there
>> > > > > is
>> > > > > > no
>> > > > > > > headache for them.
>> > > > > > >
>> > > > > > > - For users that need partition expansion of the input streams
>> > for
>> > > > > > > stateless job, they currently need to manually reboot their
>> Samza
>> > > job
>> > > > > in
>> > > > > > > order to let Samza consume the new partitions created for the
>> > > stream.
>> > > > > > SEP-5
>> > > > > > > actually reduced their headache by allowing Samza to
>> > automatically
>> > > > > detect
>> > > > > > > and consume new partitions.
>> > > > > > >
>> > > > > > > - For users that need partition expansion of the input streams
>> > for
>> > > > > > stateful
>> > > > > > > job, they have a really big headache in the sense that Samza
>> does
>> > > not
>> > > > > > allow
>> > > > > > > partition expansion for stateful job. SEP-5 addresses this
>> > headache
>> > > > for
>> > > > > > > them.
>> > > > > > >
>> > > > > > > You are right that SEP-5 requires user to understand and
>> enforce
>> > > > > > > limitations across organizations. But it is still much better
>> > than
>> > > > not
>> > > > > > > allowing user to expansion partition for stateful jobs at all,
>> > > right?
>> > > > > > Did I
>> > > > > > > miss something here?
>> > > > > > >
>> > > > > > > 2) ... Separate orgs are often difficult to coordinate and a
>> > system
>> > > > > which
>> > > > > > > depends on such significant process/coordination is too
>> fragile
>> > for
>> > > > my
>> > > > > > > taste ..
>> > > > > > >
>> > > > > > > This is true. Ideally we want a system that is fully
>> > self-serving.
>> > > I
>> > > > > > think
>> > > > > > > this is a long term goal for Samza. Still, for the reasons
>> > > described
>> > > > > > above,
>> > > > > > > I think something is better than nothing. I am open to
>> > alternative
>> > > > > design
>> > > > > > > that can support partition expansion for stateful jobs without
>> > > > > requiring
>> > > > > > > coordination.
>> > > > > > >
>> > > > > > > 3) There is currently no supported way of sharing state among
>> the
>> > > > tasks
>> > > > > > of
>> > > > > > > a container.  Each task has its own isolated store and that
>> > logical
>> > > > > > > isolation is the primary thing that enables Samza jobs to
>> scale
>> > > with
>> > > > a
>> > > > > > > simple container count change. My feeling is that we should
>> not
>> > > > change
>> > > > > > > this without
>> > > > > > > good reason.
>> > > > > > >
>> > > > > > > I see your point. I will remove this sentence from the
>> motivation
>> > > > > > section.
>> > > > > > > This won't have any impact on the design of the SEP-5. Does
>> this
>> > > > > address
>> > > > > > > the problem?
>> > > > > > >
>> > > > > > > 4) With the current proposal, we'd also need a similar check
>> for
>> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if
>> any
>> > > > other
>> > > > > > > groupers
>> > > > > > > were later added with both these modes, we'd probably need to
>> add
>> > > > those
>> > > > > > > too. It might be easier and cleaner to add a config to ignore
>> > that
>> > > > > check
>> > > > > > > temporarily. Down side is that it further complicates the
>> Samza
>> > > > config,
>> > > > > > > which is already huge. Thoughts?
>> > > > > > >
>> > > > > > > Yes, we need a similar check for
>> GroupBySystemStreamPartitionWi
>> > > > > > > thFixedTaskNum
>> > > > > > > as well. If there is more grouper classes needed in the
>> future,
>> > we
>> > > > can
>> > > > > > > solve this problem cleanly without new config. Given the
>> > > > > > > previousGrouperClass and newGrouperClass,
>> KafkaCheckpointLogKey
>> > > will
>> > > > > > throw
>> > > > > > > exception if and only if newGrouperClass is an instance of
>> > > > > > > previousGrouperClass.
>> > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum should extend
>> > > > > > > GroupBySystemStreamPartition
>> > > > > > > and GroupByPartitionWithFixedTaskNum should extend
>> > > GroupByPartition.
>> > > > > > Does
>> > > > > > > this address your concern?
>> > > > > > >
>> > > > > > > 5) The task-to-container and container-to-host mappings are
>> both
>> > > > > > meaningful
>> > > > > > > in context of the JobModel. Partition-to-task mapping is not
>> > > > meaningful
>> > > > > > > without
>> > > > > > > some definition of the key-to-partition assignments. It's
>> > > incomplete
>> > > > > > > information and therefore misleading. I think it only makes
>> sense
>> > > to
>> > > > > use
>> > > > > > > this mapping if we adopt a solution wherein Samza also knows
>> the
>> > > > > > partition
>> > > > > > > key assignment.
>> > > > > > >
>> > > > > > > Partition-to-task is currently explicitly passed from job
>> > > coordinator
>> > > > > to
>> > > > > > > each task as part of the job model to tell tasks which
>> partitions
>> > > to
>> > > > > > > consume from. I think we can store some definition of the
>> > > > > > key-to-partition
>> > > > > > > assignments if Samza decides to get and use this information
>> in
>> > the
>> > > > > > > future. Can
>> > > > > > > you be more specific why Partition-to-task mapping is not
>> > > meaningful
>> > > > > > > without
>> > > > > > > some definition of the key-to-partition assignments and why
>> it is
>> > > > > > > incomplete and misleading?
>> > > > > > >
>> > > > > > >
>> > > > > > > Thanks,
>> > > > > > > Dong
>> > > > > > >
>> > > > > > > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <
>> > jacob.m...@gmail.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Hey Dong,
>> > > > > > > >
>> > > > > > > > I'm opposed (or a +0, at best) to this limited,
>> Kafka-specific
>> > > > > > solution.
>> > > > > > > I
>> > > > > > > > understand that the proposal is relatively simple to
>> implement,
>> > > > but I
>> > > > > > > think
>> > > > > > > > it will cause headaches for Samza users. They will not only
>> > have
>> > > to
>> > > > > > > > understand all the limitations (increase only, double
>> > partitions
>> > > > > only,
>> > > > > > > > partition using hash+modulo, etc) of this approach, but
>> > enforcing
>> > > > > these
>> > > > > > > > limitations can be a major problem, especially when the
>> Samza
>> > > jobs
>> > > > > and
>> > > > > > > > message brokers are managed by separate orgs in a company.
>> > > Separate
>> > > > > > orgs
>> > > > > > > > are often difficult to coordinate and a system which
>> depends on
>> > > > such
>> > > > > > > > significant process/coordination is too fragile for my
>> taste.
>> > > > > > > >
>> > > > > > > > That said, I realize that my opinion is just one of many in
>> the
>> > > > > broader
>> > > > > > > > community which may feel differently, so let me respond to
>> some
>> > > of
>> > > > > the
>> > > > > > > > other items in the discussion so we can clear them up:
>> > > > > > > >
>> > > > > > > > The task-to-container assignment matters because if the
>> > > correlated
>> > > > > > tasks
>> > > > > > > > > (i.e. tasks that consume messages with the same key)
>> needs to
>> > > be
>> > > > in
>> > > > > > the
>> > > > > > > > > same container so that they can share the same key/value
>> > local
>> > > > > store
>> > > > > > on
>> > > > > > > > the
>> > > > > > > > > same physical machine.
>> > > > > > > >
>> > > > > > > > There is currently no supported way of sharing state among
>> the
>> > > > tasks
>> > > > > > of a
>> > > > > > > > container.  Each task has its own isolated store and that
>> > logical
>> > > > > > > isolation
>> > > > > > > > is the primary thing that enables Samza jobs to scale with a
>> > > simple
>> > > > > > > > container count change. My feeling is that we should not
>> change
>> > > > this
>> > > > > > > > without good reason.
>> > > > > > > >
>> > > > > > > > I think we can hardcode new logic in
>> > KafkaCheckpointLogKey.scala
>> > > > such
>> > > > > > > that
>> > > > > > > > > exception will not be thrown if new grouper is
>> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is
>> > > > > > GroupByPartition.
>> > > > > > > > Does
>> > > > > > > > > this look reasonable?
>> > > > > > > >
>> > > > > > > > With the current proposal, we'd also need a similar check
>> for
>> > > > > > > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And
>> if
>> > any
>> > > > > other
>> > > > > > > > groupers were later added with both these modes, we'd
>> probably
>> > > need
>> > > > > to
>> > > > > > > add
>> > > > > > > > those too. It might be easier and cleaner to add a config to
>> > > ignore
>> > > > > > that
>> > > > > > > > check temporarily. Down side is that it further complicates
>> the
>> > > > Samza
>> > > > > > > > config, which is already huge. Thoughts?
>> > > > > > > >
>> > > > > > > > I think storing the previous task-to-partition mapping is
>> more
>> > > > > general
>> > > > > > > than
>> > > > > > > > > storing the partition count of all topics for the
>> following
>> > > > > reasons:
>> > > > > > > > > - Samza already stores the task-to-container mapping and
>> > > > > > > > container-to-host
>> > > > > > > > > mapping in the coordinator stream. It seems consistent to
>> > also
>> > > > > store
>> > > > > > > the
>> > > > > > > > > partition-to-task mapping. And this information may be
>> useful
>> > > for
>> > > > > > other
>> > > > > > > > > use-case such as debugging.
>> > > > > > > > > - By having the new interface take the previous
>> > > task-to-partition
>> > > > > > > > > assignment instead of a topic-to-partition-count mapping
>> as
>> > new
>> > > > > > > > parameter,
>> > > > > > > > > we can potentially have grouper implementation to support
>> > other
>> > > > > types
>> > > > > > > of
>> > > > > > > > > input systems.
>> > > > > > > > > - It is sightly simpler to store the task-to-partition
>> > > assignment
>> > > > > > > because
>> > > > > > > > > we don't need to know whether this is the first time a
>> job is
>> > > > > started
>> > > > > > > or
>> > > > > > > > > not. On the other hand, you can write
>> > topic-to-partition-count
>> > > > > > mapping
>> > > > > > > to
>> > > > > > > > > the coordinator stream only if this is the first time the
>> job
>> > > is
>> > > > > run
>> > > > > > > >
>> > > > > > > > The task-to-container and container-to-host mappings are
>> both
>> > > > > > meaningful
>> > > > > > > in
>> > > > > > > > context of the JobModel. Partition-to-task mapping is not
>> > > > meaningful
>> > > > > > > > without some definition of the key-to-partition assignments.
>> > It's
>> > > > > > > > incomplete information and therefore misleading. I think it
>> > only
>> > > > > makes
>> > > > > > > > sense to use this mapping if we adopt a solution wherein
>> Samza
>> > > also
>> > > > > > knows
>> > > > > > > > the partition key assignment.
>> > > > > > > >
>> > > > > > > > -Jake
>> > > > > > > >
>> > > > > > > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <
>> lindon...@gmail.com
>> > >
>> > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Hey Jacob,
>> > > > > > > > >
>> > > > > > > > > Thanks for taking time to review the SEP.
>> > > > > > > > >
>> > > > > > > > > I agree with you and Navina that the current SEP doesn't
>> > > provide
>> > > > > > > support
>> > > > > > > > to
>> > > > > > > > > arbitrary input systems and it doesn't support partition
>> > > shrink.
>> > > > I
>> > > > > > > think
>> > > > > > > > > the scope of this SEP is to support partition expansion
>> for
>> > > Kafka
>> > > > > > (the
>> > > > > > > > most
>> > > > > > > > > widely used input system of Samza) and keep the door open
>> for
>> > > > > > partition
>> > > > > > > > > support of various input systems. The current design can
>> > > support
>> > > > > any
>> > > > > > > > system
>> > > > > > > > > that meets the two operational requirement specified in
>> the
>> > > doc.
>> > > > > > > > >
>> > > > > > > > > While it is possible to support more types of input
>> systems,
>> > it
>> > > > > will
>> > > > > > > > likely
>> > > > > > > > > add more complexity to the design. For example, the first
>> > > > > alternative
>> > > > > > > > > solution from you requires broker-side support to
>> negotiate
>> > > hash
>> > > > > > > > algorithm.
>> > > > > > > > > The second alternative solution requires changelog
>> partition
>> > > > > > reshuffle
>> > > > > > > > > which carries its own design complexity and performance
>> > > overhead.
>> > > > > > There
>> > > > > > > > is
>> > > > > > > > > tradeoff between the generality and the complexity among
>> > these
>> > > > > > > choices. I
>> > > > > > > > > like the current design because it is simple and
>> addresses a
>> > > big
>> > > > > > usage
>> > > > > > > > > scenario for us. We can add more complexity to generalize
>> the
>> > > > > design
>> > > > > > if
>> > > > > > > > it
>> > > > > > > > > enables important use-case. Does this sound reasonable?
>> > > > > > > > >
>> > > > > > > > > Note that the "Rejected Alternative" section also mentions
>> > the
>> > > > > > > > possibility
>> > > > > > > > > of supporting a wider range of input systems by allowing
>> user
>> > > to
>> > > > > > > specify
>> > > > > > > > > the new-partition to old-partition mapping. We are not
>> doing
>> > it
>> > > > > > because
>> > > > > > > > 1)
>> > > > > > > > > we may have better understanding of the design after we
>> have
>> > a
>> > > > > > specific
>> > > > > > > > > second input system to support 2) the current design can
>> be
>> > > > > extended
>> > > > > > to
>> > > > > > > > > support general input systems. I think similar argument
>> can
>> > be
>> > > > > > applied
>> > > > > > > > > explain why we don't have to support general input systems
>> > > using
>> > > > > the
>> > > > > > > > > potentially-good alternatives you mentioned.
>> > > > > > > > >
>> > > > > > > > > I hope SEP-5 can be an important first-step towards
>> > supporting
>> > > > > > > partition
>> > > > > > > > > expansion of any input system.
>> > > > > > > > >
>> > > > > > > > > To answer your questions about the current proposal:
>> > > > > > > > >
>> > > > > > > > > >1. "An alternative solution is to allow task number to
>> > > increase
>> > > > > > after
>> > > > > > > > > >partition expansion and uses a proper task-to-container
>> > > > assignment
>> > > > > > to
>> > > > > > > > make
>> > > > > > > > > >sure the Samza output is correct." What does the
>> container
>> > > have
>> > > > to
>> > > > > > do
>> > > > > > > > with
>> > > > > > > > > >stateful processing or output in general?
>> > > > > > > > >
>> > > > > > > > > The task-to-container assignment matters because if the
>> > > > correlated
>> > > > > > > tasks
>> > > > > > > > > (i.e. tasks that consume messages with the same key)
>> needs to
>> > > be
>> > > > in
>> > > > > > the
>> > > > > > > > > same container so that they can share the same key/value
>> > local
>> > > > > store
>> > > > > > on
>> > > > > > > > the
>> > > > > > > > > same physical machine.
>> > > > > > > > >
>> > > > > > > > > >2. When you use "Join" as an example, you basically mean
>> > > > multiple
>> > > > > > > > > >co-partitioned streams, right? This is opposed to
>> multiple,
>> > > > > > > > > >independently-partitioned streams or a single stream.
>> Would
>> > be
>> > > > > nice
>> > > > > > to
>> > > > > > > > > >formulate the proposal in these more general terms.
>> > > > > > > > >
>> > > > > > > > > I thought "join" is a commonly used to refer to the join
>> > > > opeartion
>> > > > > > with
>> > > > > > > > > co-partitioned stream but I may be wrong. I have updated
>> the
>> > > wiki
>> > > > > to
>> > > > > > > > > explicitly mention "co-partitioned stream". Does this look
>> > > better
>> > > > > > now?
>> > > > > > > > >
>> > > > > > > > > >3. When switching SSP groupers, how will the users avoid
>> the
>> > > > > > > > > >org.apache.samza.checkpoint.kafka.
>> > > > DifferingSystemStreamPartition
>> > > > > > > > > GrouperFactoryValues
>> > > > > > > > > >exception?
>> > > > > > > > >
>> > > > > > > > > I think we can hardcode new logic in
>> > > KafkaCheckpointLogKey.scala
>> > > > > such
>> > > > > > > > that
>> > > > > > > > > exception will not be thrown if new grouper is
>> > > > > > > > > GroupByPartitionWithFixedTaskNum and old grouper is
>> > > > > > GroupByPartition.
>> > > > > > > > Does
>> > > > > > > > > this look reasonable?
>> > > > > > > > >
>> > > > > > > > > >4. Partition to task assignment is meaningless without
>> key
>> > to
>> > > > > > > partition
>> > > > > > > > > >mapping. The real semantics are captured in the external
>> > > > > requirement
>> > > > > > > for
>> > > > > > > > > >partitioning via hash+modulo. But in that case, iiuc,
>> only
>> > the
>> > > > > > > partition
>> > > > > > > > > >count matters. So why not just store the original
>> partition
>> > > > count
>> > > > > > > rather
>> > > > > > > > > >than the whole mapping?
>> > > > > > > > >
>> > > > > > > > > I think storing the previous task-to-partition mapping is
>> > more
>> > > > > > general
>> > > > > > > > than
>> > > > > > > > > storing the partition count of all topics for the
>> following
>> > > > > reasons:
>> > > > > > > > >
>> > > > > > > > > - Samza already stores the task-to-container mapping and
>> > > > > > > > container-to-host
>> > > > > > > > > mapping in the coordinator stream. It seems consistent to
>> > also
>> > > > > store
>> > > > > > > the
>> > > > > > > > > partition-to-task mapping. And this information may be
>> useful
>> > > for
>> > > > > > other
>> > > > > > > > > use-case such as debugging.
>> > > > > > > > >
>> > > > > > > > > - By having the new interface take the previous
>> > > task-to-partition
>> > > > > > > > > assignment instead of a topic-to-partition-count mapping
>> as
>> > new
>> > > > > > > > parameter,
>> > > > > > > > > we can potentially have grouper implementation to support
>> > other
>> > > > > types
>> > > > > > > of
>> > > > > > > > > input systems.
>> > > > > > > > >
>> > > > > > > > > - It is sightly simpler to store the task-to-partition
>> > > assignment
>> > > > > > > because
>> > > > > > > > > we don't need to know whether this is the first time a
>> job is
>> > > > > started
>> > > > > > > or
>> > > > > > > > > not. On the other hand, you can write
>> > topic-to-partition-count
>> > > > > > mapping
>> > > > > > > to
>> > > > > > > > > the coordinator stream only if this is the first time the
>> job
>> > > is
>> > > > > run
>> > > > > > > > >
>> > > > > > > > > Thanks,
>> > > > > > > > > Dong
>> > > > > > > > >
>> > > > > > > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <
>> > > > jacob.m...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hey Dong,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for the SEP. Supporting partition changes is
>> > > critically
>> > > > > > > > important
>> > > > > > > > > > for stateful Samza jobs, so it's great to see some
>> ideas on
>> > > > that
>> > > > > > > front!
>> > > > > > > > > >
>> > > > > > > > > > Sorry for the late feedback, but I have a few thoughts
>> to
>> > > > > > contribute.
>> > > > > > > > > >
>> > > > > > > > > > Big +1 on Navina's comment:
>> > > > > > > > > >
>> > > > > > > > > > > My biggest gripe with this SEP is that it seems like a
>> > > > > > tailor-made
>> > > > > > > > > > > solution
>> > > > > > > > > > > that relies on the semantics of the Kafka system and
>> yet,
>> > > we
>> > > > > are
>> > > > > > > > trying
>> > > > > > > > > > to
>> > > > > > > > > > > masquerade that as operational requirements for other
>> > > systems
>> > > > > > > > > interacting
>> > > > > > > > > > > with Samza. (Not to say that this is the first time
>> such
>> > a
>> > > > > choice
>> > > > > > > is
>> > > > > > > > > > being
>> > > > > > > > > > > made in the Samza design). I am not seeing how this
>> can a
>> > > > > > "general"
>> > > > > > > > > > > solution for all input systems. That's my two cents. I
>> > > would
>> > > > > like
>> > > > > > > to
>> > > > > > > > > hear
>> > > > > > > > > > > alternative points of view for this from other devs.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Two examples of this:
>> > > > > > > > > > 1. This is mostly a hypothetical, but some message
>> brokers
>> > > may
>> > > > > use
>> > > > > > > key
>> > > > > > > > > > range assignment rather than hash+modulo.
>> > > > > > > > > > 2. Kafka can't reduce the number of partitions, but it
>> can
>> > > > happen
>> > > > > > on
>> > > > > > > > > other
>> > > > > > > > > > systems. For example, it may be cheaper to reduce the
>> > number
>> > > of
>> > > > > > > > > partitions
>> > > > > > > > > > on a hosted service where the cost model depends on the
>> > > number
>> > > > of
>> > > > > > > > > > partitions/shards.
>> > > > > > > > > >
>> > > > > > > > > > It seems to me that a solution which doesn't depend on
>> > > > partition
>> > > > > > key
>> > > > > > > > > > assignment in the message broker. Here are a few
>> > alternatives
>> > > > > that
>> > > > > > > > > weren't
>> > > > > > > > > > discussed and I think should be considered:
>> > > > > > > > > >
>> > > > > > > > > > Alternatives in order of increasing preference:
>> > > > > > > > > > 1. Samza manages the partition hash (via some new
>> contract
>> > > with
>> > > > > the
>> > > > > > > > > > brokers) and guarantees correct routing of keys among
>> the
>> > new
>> > > > > > > > partitions.
>> > > > > > > > > > 2. Samza detects a task count change, creates a new
>> > changelog
>> > > > > with
>> > > > > > > > > correct
>> > > > > > > > > > partitions, and *somehow* reshuffles all existing
>> changelog
>> > > > data
>> > > > > > into
>> > > > > > > > the
>> > > > > > > > > > new topic and then uses the new topic from then on.
>> > (doesn't
>> > > > work
>> > > > > > > > without
>> > > > > > > > > > changelog, but in that case durability isn't paramount,
>> so
>> > we
>> > > > can
>> > > > > > > just
>> > > > > > > > > > wipe)
>> > > > > > > > > > 3. Use RPC in between stages and samza fully manages key
>> > > > > assignment
>> > > > > > > > among
>> > > > > > > > > > tasks. No on-disk topic data to clean up. Mandatory
>> > > > > repartitioning
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > first stage to pre-scaled tasks in next stage.
>> > > > > > > > > > 4. Combined 2-3 solution
>> > > > > > > > > >
>> > > > > > > > > > Finally, some questions about the current proposal:
>> > > > > > > > > > 1. "An alternative solution is to allow task number to
>> > > increase
>> > > > > > after
>> > > > > > > > > > partition expansion and uses a proper task-to-container
>> > > > > assignment
>> > > > > > to
>> > > > > > > > > make
>> > > > > > > > > > sure the Samza output is correct." What does the
>> container
>> > > have
>> > > > > to
>> > > > > > do
>> > > > > > > > > with
>> > > > > > > > > > stateful processing or output in general?
>> > > > > > > > > > 2. When you use "Join" as an example, you basically mean
>> > > > multiple
>> > > > > > > > > > co-partitioned streams, right? This is opposed to
>> multiple,
>> > > > > > > > > > independently-partitioned streams or a single stream.
>> Would
>> > > be
>> > > > > nice
>> > > > > > > to
>> > > > > > > > > > formulate the proposal in these more general terms.
>> > > > > > > > > > 3. When switching SSP groupers, how will the users avoid
>> > the
>> > > > > > > > > > org.apache.samza.checkpoint.kafka.
>> > DifferingSystemStreamParti
>> > > > > > > > > > tionGrouperFactoryValues
>> > > > > > > > > > exception?
>> > > > > > > > > > 4. Partition to task assignment is meaningless without
>> key
>> > to
>> > > > > > > partition
>> > > > > > > > > > mapping. The real semantics are captured in the external
>> > > > > > requirement
>> > > > > > > > for
>> > > > > > > > > > partitioning via hash+modulo. But in that case, iiuc,
>> only
>> > > the
>> > > > > > > > partition
>> > > > > > > > > > count matters. So why not just store the original
>> partition
>> > > > count
>> > > > > > > > rather
>> > > > > > > > > > than the whole mapping?
>> > > > > > > > > >
>> > > > > > > > > > -Jake
>> > > > > > > > > >
>> > > > > > > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <
>> > > lindon...@gmail.com
>> > > > >
>> > > > > > > wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hey Yi, Navina,
>> > > > > > > > > > >
>> > > > > > > > > > > I have updated the SEP-5 document based on our
>> > discussion.
>> > > > The
>> > > > > > > > > difference
>> > > > > > > > > > > can be found here
>> > > > > > > > > > > <https://cwiki.apache.org/confluence/pages/
>> > > > > > > diffpagesbyversion.action
>> > > > > > > > ?
>> > > > > > > > > > > pageId=70255476&selectedPageVersions=14&
>> > > selectedPageVersions
>> > > > > > =15>.
>> > > > > > > > > > > Here is the summary of changes:
>> > > > > > > > > > >
>> > > > > > > > > > > - Add new interface that extends the existing
>> interface
>> > > > > > > > > > > SystemStreamPartitionGrouper. Newly-added grouper
>> class
>> > > > should
>> > > > > > > > > implement
>> > > > > > > > > > > this interface.
>> > > > > > > > > > > - Explained in the Rejected Alternative Section why we
>> > > don't
>> > > > > add
>> > > > > > > new
>> > > > > > > > > > method
>> > > > > > > > > > > in the existing interface
>> > > > > > > > > > > - Explained in the Rejected Alternative Section why we
>> > > don't
>> > > > > > > > > config/class
>> > > > > > > > > > > for user to specify new-partition to old-partition
>> > mapping.
>> > > > > > > > > > >
>> > > > > > > > > > > Can you take another look at the proposal and let me
>> know
>> > > if
>> > > > > > there
>> > > > > > > is
>> > > > > > > > > any
>> > > > > > > > > > > concern?
>> > > > > > > > > > >
>> > > > > > > > > > > Cheers,
>> > > > > > > > > > > Dong
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <
>> > > > lindon...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hey Yi,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks much for the comment. I have updated the doc
>> to
>> > > > > address
>> > > > > > > all
>> > > > > > > > > your
>> > > > > > > > > > > > comments except the one related to the interface. I
>> am
>> > > not
>> > > > > > sure I
>> > > > > > > > > > > > understand your suggestion of the new interface.
>> Will
>> > > > discuss
>> > > > > > > > > tomorrow.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > Dong
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <
>> > > > nickpa...@gmail.com
>> > > > > >
>> > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > >> Hi, Don,
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Thanks for the detailed design doc for a
>> long-waited
>> > > > feature
>> > > > > > in
>> > > > > > > > > Samza!
>> > > > > > > > > > > >> Really appreciate it! I did a quick pass and have
>> the
>> > > > > > following
>> > > > > > > > > > > comments:
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> - minor: "limit the maximum size of partition" ==>
>> > > "limit
>> > > > > the
>> > > > > > > > > maximum
>> > > > > > > > > > > size
>> > > > > > > > > > > >> of each partition"
>> > > > > > > > > > > >> - "However, Samza currently is not able to handle
>> > > > partition
>> > > > > > > > > expansion
>> > > > > > > > > > of
>> > > > > > > > > > > >> the input streams"==>better point out "for stateful
>> > > jobs".
>> > > > > For
>> > > > > > > > > > stateless
>> > > > > > > > > > > >> jobs, simply bouncing the job now can pick up the
>> new
>> > > > > > > partitions.
>> > > > > > > > > > > >> - "it is possible (e.g. with Kafka) that messages
>> > with a
>> > > > > given
>> > > > > > > key
>> > > > > > > > > > > exists
>> > > > > > > > > > > >> in both partition 1 an 3. Because GroupByPartition
>> > will
>> > > > > assign
>> > > > > > > > > > > partition 1
>> > > > > > > > > > > >> and 3 to different tasks, messages with the same
>> key
>> > may
>> > > > be
>> > > > > > > > handled
>> > > > > > > > > by
>> > > > > > > > > > > >> different task/container/process and their state
>> will
>> > be
>> > > > > > stored
>> > > > > > > in
>> > > > > > > > > > > >> different changelog partition." The problem
>> statement
>> > is
>> > > > not
>> > > > > > > super
>> > > > > > > > > > clear
>> > > > > > > > > > > >> here. The issues with stateful jobs is: after
>> > > > > GroupByPartition
>> > > > > > > > > assign
>> > > > > > > > > > > >> partition 1 and 3 to different tasks, the new task
>> > > > handling
>> > > > > > > > > partition
>> > > > > > > > > > 3
>> > > > > > > > > > > >> does not have the previous state to resume the
>> work.
>> > > e.g.
>> > > > a
>> > > > > > > > page-key
>> > > > > > > > > > > based
>> > > > > > > > > > > >> counter would start from 0 in the new task for a
>> > > specific
>> > > > > key,
>> > > > > > > > > instead
>> > > > > > > > > > > of
>> > > > > > > > > > > >> resuming the previous count 50 held by task 1.
>> > > > > > > > > > > >> - minor rewording: "the first solution in this doc"
>> > ==>
>> > > > "the
>> > > > > > > > > solution
>> > > > > > > > > > > >> proposed in this doc"
>> > > > > > > > > > > >> - "Thus additional development work is needed in
>> Kafka
>> > > to
>> > > > > meet
>> > > > > > > > this
>> > > > > > > > > > > >> requirement" It would be good to link to a KIP if
>> and
>> > > when
>> > > > > it
>> > > > > > > > exists
>> > > > > > > > > > > >> - Instead of touching/deprecating the interface
>> > > > > > > > > > > >> SystemStreamPartitionGrouper, I would recommend to
>> > have
>> > > a
>> > > > > > > > different
>> > > > > > > > > > > >> implementation class of the interface, which in the
>> > > > > > constructor
>> > > > > > > of
>> > > > > > > > > the
>> > > > > > > > > > > >> grouper, takes two parameters: a) the previous task
>> > > number
>> > > > > > read
>> > > > > > > > from
>> > > > > > > > > > the
>> > > > > > > > > > > >> coordinator stream; b) the configured
>> new-partition to
>> > > > > > > > old-partition
>> > > > > > > > > > > >> mapping policy. Then, the grouper's interface
>> method
>> > > stays
>> > > > > the
>> > > > > > > > same
>> > > > > > > > > > and
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >> behavior of the grouper is more configurable which
>> is
>> > > good
>> > > > > to
>> > > > > > > > > support
>> > > > > > > > > > a
>> > > > > > > > > > > >> broader set of use cases in addition to Kafka's
>> > built-in
>> > > > > > > partition
>> > > > > > > > > > > >> expansion policies.
>> > > > > > > > > > > >> - Minor renaming suggestion to the new grouper
>> class
>> > > > names:
>> > > > > > > > > > > >> GroupByPartitionWithFixedTaskNum
>> > > > > > > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> Thanks!
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> - Yi
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <
>> > > > > > lindon...@gmail.com
>> > > > > > > >
>> > > > > > > > > > wrote:
>> > > > > > > > > > > >>
>> > > > > > > > > > > >> > Hey Navina,
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > Thanks much for the comment. Please see my
>> response
>> > > > below.
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > Regarding your biggest gripe with the SEP, I
>> > > personally
>> > > > > > think
>> > > > > > > > the
>> > > > > > > > > > > >> > operational requirement proposed in the KIP are
>> > pretty
>> > > > > > general
>> > > > > > > > and
>> > > > > > > > > > > >> could be
>> > > > > > > > > > > >> > easily enforced by other systems. The reason is
>> that
>> > > the
>> > > > > > > module
>> > > > > > > > > > > >> operation
>> > > > > > > > > > > >> > is pretty standard and the default option when we
>> > > choose
>> > > > > > > > > partition.
>> > > > > > > > > > > And
>> > > > > > > > > > > >> > usually the underlying system allows user to
>> select
>> > > > > > arbitrary
>> > > > > > > > > > > partition
>> > > > > > > > > > > >> > number if it supports partition expansion. Do you
>> > know
>> > > > any
>> > > > > > > > system
>> > > > > > > > > > that
>> > > > > > > > > > > >> does
>> > > > > > > > > > > >> > not meet these two requirement?
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > Regarding your comment of the Motivation
>> section, I
>> > > > > renamed
>> > > > > > > the
>> > > > > > > > > > first
>> > > > > > > > > > > >> > section as "Problem and Goal" and specified that
>> > "*The
>> > > > > goal
>> > > > > > of
>> > > > > > > > > this
>> > > > > > > > > > > >> > proposal is to enable partition expansion of the
>> > input
>> > > > > > > > > streams*.". I
>> > > > > > > > > > > >> also
>> > > > > > > > > > > >> > put a sentence at the end of the Motivation
>> section
>> > > that
>> > > > > > "*The
>> > > > > > > > > > feature
>> > > > > > > > > > > >> of
>> > > > > > > > > > > >> > task expansion is out of the scope of this
>> proposal
>> > > and
>> > > > > will
>> > > > > > > be
>> > > > > > > > > > > >> addressed
>> > > > > > > > > > > >> > in a future SEP*". The second paragraph in the
>> > > > Motivation
>> > > > > > > > section
>> > > > > > > > > is
>> > > > > > > > > > > >> mainly
>> > > > > > > > > > > >> > used to explain the thinking process that we have
>> > gone
>> > > > > > > through,
>> > > > > > > > > what
>> > > > > > > > > > > >> other
>> > > > > > > > > > > >> > alternative we have considered, and we plan to
>> do in
>> > > > Samza
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > nex
>> > > > > > > > > > > >> step.
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > To answer your question why increasing the
>> partition
>> > > > > number
>> > > > > > > will
>> > > > > > > > > > > >> increase
>> > > > > > > > > > > >> > the throughput of the kafka consumer in the
>> > container,
>> > > > > Kafka
>> > > > > > > > > > consumer
>> > > > > > > > > > > >> can
>> > > > > > > > > > > >> > potentially fetch more data in one FetchResponse
>> > with
>> > > > more
>> > > > > > > > > > partitions
>> > > > > > > > > > > in
>> > > > > > > > > > > >> > the FetchRequest. This is because we limit the
>> > maximum
>> > > > > > amount
>> > > > > > > of
>> > > > > > > > > > data
>> > > > > > > > > > > >> that
>> > > > > > > > > > > >> > can be fetch for a given partition in the
>> > > FetchResponse.
>> > > > > > This
>> > > > > > > by
>> > > > > > > > > > > >> default is
>> > > > > > > > > > > >> > set to 1 MB. And there is reason that we can not
>> > > > > arbitrarily
>> > > > > > > > bump
>> > > > > > > > > up
>> > > > > > > > > > > >> this
>> > > > > > > > > > > >> > limit.
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > To answer your question how partition expansion
>> in
>> > > Kafka
>> > > > > > > impacts
>> > > > > > > > > the
>> > > > > > > > > > > >> > clients, Kafka consumer is able to automatically
>> > > detect
>> > > > > new
>> > > > > > > > > > partition
>> > > > > > > > > > > of
>> > > > > > > > > > > >> > the topic and reassign all (both old and new)
>> > > partitions
>> > > > > > > across
>> > > > > > > > > > > >> consumers
>> > > > > > > > > > > >> > in the consumer group IF you tell consumer the
>> topic
>> > > to
>> > > > be
>> > > > > > > > > > subscribed.
>> > > > > > > > > > > >> But
>> > > > > > > > > > > >> > consumer in Samza's container uses another way of
>> > > > > > > subscription.
>> > > > > > > > > > > Instead
>> > > > > > > > > > > >> of
>> > > > > > > > > > > >> > subscribing to the topic, the consumer in Samza's
>> > > > > container
>> > > > > > > > > > subscribes
>> > > > > > > > > > > >> to
>> > > > > > > > > > > >> > the specific partitions of the topic. In this
>> case,
>> > if
>> > > > new
>> > > > > > > > > > partitions
>> > > > > > > > > > > >> have
>> > > > > > > > > > > >> > been added, Samza will need to explicitly
>> subscribe
>> > to
>> > > > the
>> > > > > > new
>> > > > > > > > > > > >> partitions
>> > > > > > > > > > > >> > of the topic. The "Handle partition expansion
>> while
>> > > > tasks
>> > > > > > are
>> > > > > > > > > > running"
>> > > > > > > > > > > >> > section in the SEP addresses this issue in Samza
>> --
>> > it
>> > > > > > > > > recalculates
>> > > > > > > > > > > the
>> > > > > > > > > > > >> job
>> > > > > > > > > > > >> > model and restart container so that consumer can
>> > > > subscribe
>> > > > > > to
>> > > > > > > > the
>> > > > > > > > > > new
>> > > > > > > > > > > >> > partitions.
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > I will ask other dev to take a look at the
>> > proposal. I
>> > > > > will
>> > > > > > > > start
>> > > > > > > > > > the
>> > > > > > > > > > > >> > voting thread tomorrow if there is no further
>> > concern
>> > > > with
>> > > > > > the
>> > > > > > > > > SEP.
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > Thanks!
>> > > > > > > > > > > >> > Dong
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh
>> > > > (Apache) <
>> > > > > > > > > > > >> > nav...@apache.org>
>> > > > > > > > > > > >> > wrote:
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >> > > Hey Dong,
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > >  I have updated the motivation section to
>> > clarify
>> > > > > this.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Thanks for updating the motivation. Couple of
>> > notes
>> > > > > here:
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > 1.
>> > > > > > > > > > > >> > > > "The motivation of increasing partition
>> number
>> > of
>> > > > > Kafka
>> > > > > > > > topic
>> > > > > > > > > > > >> includes
>> > > > > > > > > > > >> > 1)
>> > > > > > > > > > > >> > > limit the maximum size of a partition in order
>> to
>> > > > > improve
>> > > > > > > > broker
>> > > > > > > > > > > >> > > performance and 2) increase throughput of Kafka
>> > > > consumer
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > Samza
>> > > > > > > > > > > >> > > container."
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > It's unclear to me how increasing the partition
>> > > number
>> > > > > > will
>> > > > > > > > > > increase
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >> > > throughput of the kafka consumer in the
>> container?
>> > > > > > > > > Theoretically,
>> > > > > > > > > > > you
>> > > > > > > > > > > >> > will
>> > > > > > > > > > > >> > > still be consuming the same amount of data in
>> the
>> > > > > > container,
>> > > > > > > > > > > >> irrespective
>> > > > > > > > > > > >> > > of whether it is coming from one partition or
>> more
>> > > > than
>> > > > > > one
>> > > > > > > > > > expanded
>> > > > > > > > > > > >> > > partitions. Can you please explain it for me
>> here,
>> > > > what
>> > > > > > you
>> > > > > > > > mean
>> > > > > > > > > > by
>> > > > > > > > > > > >> that?
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > 2. I believe the second paragraph under
>> motivation
>> > > is
>> > > > > > simply
>> > > > > > > > > > talking
>> > > > > > > > > > > >> > about
>> > > > > > > > > > > >> > > the scope of the current SEP. It will be
>> easier to
>> > > > read
>> > > > > if
>> > > > > > > > what
>> > > > > > > > > > > >> solution
>> > > > > > > > > > > >> > is
>> > > > > > > > > > > >> > > included in this SEP and what is left out as
>> not
>> > in
>> > > > > scope.
>> > > > > > > > (for
>> > > > > > > > > > > >> example,
>> > > > > > > > > > > >> > > expansions for stateful jobs is supported or
>> not).
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > We need to persist the task-to-sspList
>> mapping
>> > in
>> > > > the
>> > > > > > > > > > > >> > > coordinator stream so that the job can derive
>> the
>> > > > > original
>> > > > > > > > > number
>> > > > > > > > > > of
>> > > > > > > > > > > >> > > partitions of each input stream regardless of
>> how
>> > > many
>> > > > > > times
>> > > > > > > > the
>> > > > > > > > > > > >> > partition
>> > > > > > > > > > > >> > > has expanded. Does this make sense?
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Yes. It does!
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > I am not sure how this is related to the
>> > locality
>> > > > > > though.
>> > > > > > > > Can
>> > > > > > > > > > you
>> > > > > > > > > > > >> > clarify
>> > > > > > > > > > > >> > > your question if I haven't answered your
>> question?
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > It's not related. I just meant to give an
>> example
>> > of
>> > > > yet
>> > > > > > > > another
>> > > > > > > > > > > >> > > coordinator message that is persisted. Your
>> > > > ssp-to-task
>> > > > > > > > mapping
>> > > > > > > > > is
>> > > > > > > > > > > >> > > following a similar pattern for persisting.
>> Just
>> > > > wanted
>> > > > > to
>> > > > > > > > > clarify
>> > > > > > > > > > > >> that.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > Can you let me know if this, together with
>> the
>> > > > answers
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > >> previous
>> > > > > > > > > > > >> > > email, addresses all your questions?
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Yes. I believe you have addressed most of my
>> > > > questions.
>> > > > > > > Thanks
>> > > > > > > > > for
>> > > > > > > > > > > >> taking
>> > > > > > > > > > > >> > > time to do that.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > Is there specific question you have regarding
>> > > > > partition
>> > > > > > > > > > > >> > > expansion in Kafka?
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > I guess my questions are on how partition
>> > expansion
>> > > in
>> > > > > > Kafka
>> > > > > > > > > > impacts
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >> > > clients. Iiuc, partition expansions are done
>> > > manually
>> > > > in
>> > > > > > > Kafka
>> > > > > > > > > > based
>> > > > > > > > > > > >> on
>> > > > > > > > > > > >> > the
>> > > > > > > > > > > >> > > bytes-in rate of the partition. Do the existing
>> > > kafka
>> > > > > > > clients
>> > > > > > > > > > handle
>> > > > > > > > > > > >> this
>> > > > > > > > > > > >> > > expansion automatically? if yes, how does it
>> work?
>> > > If
>> > > > > not,
>> > > > > > > are
>> > > > > > > > > > there
>> > > > > > > > > > > >> > plans
>> > > > > > > > > > > >> > > to support it in the future?
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > Thus user's job should not need to bootstrap
>> > > > key/value
>> > > > > > > store
>> > > > > > > > > > from
>> > > > > > > > > > > >> the
>> > > > > > > > > > > >> > > changelog topic.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Why is this discussion relevant here? Key/value
>> > > store
>> > > > /
>> > > > > > > > > changelog
>> > > > > > > > > > > >> topic
>> > > > > > > > > > > >> > > partition is scoped with the context of a task.
>> > > Since
>> > > > we
>> > > > > > are
>> > > > > > > > not
>> > > > > > > > > > > >> changing
>> > > > > > > > > > > >> > > the number of tasks, I don't think it is
>> required
>> > to
>> > > > > > mention
>> > > > > > > > it
>> > > > > > > > > > > here.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > The new method takes the
>> > > > SystemStreamPartition-to-Task
>> > > > > > > > > > assignment
>> > > > > > > > > > > >> from
>> > > > > > > > > > > >> > > the previous job model which can be read from
>> the
>> > > > > > > coordinator
>> > > > > > > > > > > stream.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Jobmodel is currently not persisted to
>> coordinator
>> > > > > stream.
>> > > > > > > In
>> > > > > > > > > your
>> > > > > > > > > > > >> > design,
>> > > > > > > > > > > >> > > you talk about writing separate coordinator
>> > messages
>> > > > for
>> > > > > > > > > > ssp-to-task
>> > > > > > > > > > > >> > > assignments. Hence, please correct this
>> statement.
>> > > It
>> > > > is
>> > > > > > > kind
>> > > > > > > > of
>> > > > > > > > > > > >> > misleading
>> > > > > > > > > > > >> > > to the reader.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > My biggest gripe with this SEP is that it seems
>> > > like a
>> > > > > > > > > tailor-made
>> > > > > > > > > > > >> > solution
>> > > > > > > > > > > >> > > that relies on the semantics of the Kafka
>> system
>> > and
>> > > > > yet,
>> > > > > > we
>> > > > > > > > are
>> > > > > > > > > > > >> trying
>> > > > > > > > > > > >> > to
>> > > > > > > > > > > >> > > masquerade that as operational requirements for
>> > > other
>> > > > > > > systems
>> > > > > > > > > > > >> interacting
>> > > > > > > > > > > >> > > with Samza. (Not to say that this is the first
>> > time
>> > > > > such a
>> > > > > > > > > choice
>> > > > > > > > > > is
>> > > > > > > > > > > >> > being
>> > > > > > > > > > > >> > > made in the Samza design). I am not seeing how
>> > this
>> > > > can
>> > > > > a
>> > > > > > > > > > "general"
>> > > > > > > > > > > >> > > solution for all input systems. That's my two
>> > > cents. I
>> > > > > > would
>> > > > > > > > > like
>> > > > > > > > > > to
>> > > > > > > > > > > >> hear
>> > > > > > > > > > > >> > > alternative points of view for this from other
>> > devs.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Please make sure you have enough eyes on this
>> SEP.
>> > > If
>> > > > > you
>> > > > > > > do,
>> > > > > > > > > > please
>> > > > > > > > > > > >> > start
>> > > > > > > > > > > >> > > a VOTE thread to approve this SEP.
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > Thanks!
>> > > > > > > > > > > >> > > Navina
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <
>> > > > > > > > lindon...@gmail.com
>> > > > > > > > > >
>> > > > > > > > > > > >> wrote:
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> > > > Hey Navina,
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > I have updated the wiki based on your
>> > suggestion.
>> > > > More
>> > > > > > > > > > > >> specifically, I
>> > > > > > > > > > > >> > > have
>> > > > > > > > > > > >> > > > made the following changes:
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > - Improved Problem section and Motivation
>> > section
>> > > to
>> > > > > > > > describe
>> > > > > > > > > > why
>> > > > > > > > > > > we
>> > > > > > > > > > > >> > use
>> > > > > > > > > > > >> > > > the solution in this proposal instead of
>> > tackling
>> > > > the
>> > > > > > > > problem
>> > > > > > > > > of
>> > > > > > > > > > > >> task
>> > > > > > > > > > > >> > > > expansion directly.
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > - Illustrate the design in a way that doesn't
>> > bind
>> > > > to
>> > > > > > > Kafka.
>> > > > > > > > > > Kafka
>> > > > > > > > > > > >> is
>> > > > > > > > > > > >> > > only
>> > > > > > > > > > > >> > > > used as example to illustrate why we want to
>> > > expand
>> > > > > > > > partition
>> > > > > > > > > > > >> expansion
>> > > > > > > > > > > >> > > and
>> > > > > > > > > > > >> > > > whether the operational requirement can be
>> > > supported
>> > > > > > when
>> > > > > > > > > Kafka
>> > > > > > > > > > is
>> > > > > > > > > > > >> used
>> > > > > > > > > > > >> > > as
>> > > > > > > > > > > >> > > > the input system. Note that the proposed
>> > solution
>> > > > > should
>> > > > > > > > work
>> > > > > > > > > > for
>> > > > > > > > > > > >> any
>> > > > > > > > > > > >> > > input
>> > > > > > > > > > > >> > > > system that meets the operational requirement
>> > > > > described
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > wiki.
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > - Fixed the problem in the figure.
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > - Added a new class
>> > GroupBySystemStreamPartitionFi
>> > > > > > > > xedTaskNum
>> > > > > > > > > to
>> > > > > > > > > > > the
>> > > > > > > > > > > >> > > wiki.
>> > > > > > > > > > > >> > > > Together with GroupByPartitionFixedTaskNum,
>> it
>> > > > should
>> > > > > > > ensure
>> > > > > > > > > > that
>> > > > > > > > > > > we
>> > > > > > > > > > > >> > > have a
>> > > > > > > > > > > >> > > > solution to enable partition expansion for
>> all
>> > > users
>> > > > > > that
>> > > > > > > > are
>> > > > > > > > > > > using
>> > > > > > > > > > > >> > > > pre-defined grouper in Samza. Note that those
>> > > users
>> > > > > who
>> > > > > > > use
>> > > > > > > > > > custom
>> > > > > > > > > > > >> > > grouper
>> > > > > > > > > > > >> > > > would need to update their implementation.
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > Can you let me know if this, together with
>> the
>> > > > answers
>> > > > > > in
>> > > > > > > > the
>> > > > > > > > > > > >> previous
>> > > > > > > > > > > >> > > > email, addresses all your questions? Thanks
>> for
>> > > > taking
>> > > > > > > time
>> > > > > > > > to
>> > > > > > > > > > > >> review
>> > > > > > > > > > > >> > the
>> > > > > > > > > > > >> > > > proposal.
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > Regards,
>> > > > > > > > > > > >> > > > Dong
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
>> > > > > > > > > lindon...@gmail.com
>> > > > > > > > > > >
>> > > > > > > > > > > >> > wrote:
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > > > > Hey Navina,
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > Thanks much for your comments. Please see
>> my
>> > > reply
>> > > > > > > inline.
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina
>> > Ramesh
>> > > > > > > (Apache) <
>> > > > > > > > > > > >> > > > > nav...@apache.org> wrote:
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple
>> of
>> > > > > > questions
>> > > > > > > to
>> > > > > > > > > > > >> understand
>> > > > > > > > > > > >> > > > your
>> > > > > > > > > > > >> > > > >> proposal better:
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> * Under motivation, you mention that "_We
>> > > expect
>> > > > > this
>> > > > > > > > > > solution
>> > > > > > > > > > > to
>> > > > > > > > > > > >> > work
>> > > > > > > > > > > >> > > > >> similarly with other input system as
>> well._",
>> > > > yet I
>> > > > > > > don't
>> > > > > > > > > see
>> > > > > > > > > > > any
>> > > > > > > > > > > >> > > > >> discussion on how it will work with other
>> > input
>> > > > > > > systems.
>> > > > > > > > > That
>> > > > > > > > > > > is,
>> > > > > > > > > > > >> > what
>> > > > > > > > > > > >> > > > >> kind
>> > > > > > > > > > > >> > > > >> of contract does samza expect from other
>> > input
>> > > > > > systems
>> > > > > > > ?
>> > > > > > > > If
>> > > > > > > > > > we
>> > > > > > > > > > > >> are
>> > > > > > > > > > > >> > not
>> > > > > > > > > > > >> > > > >> planning to provide a generic solution, it
>> > > might
>> > > > be
>> > > > > > > worth
>> > > > > > > > > > > >> calling it
>> > > > > > > > > > > >> > > out
>> > > > > > > > > > > >> > > > >> in
>> > > > > > > > > > > >> > > > >> the SEP.
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > I think the contract we expect from other
>> > > systems
>> > > > > are
>> > > > > > > > > exactly
>> > > > > > > > > > > the
>> > > > > > > > > > > >> > > > > operational requirement mentioned in the
>> SEP,
>> > > i.e.
>> > > > > > > > > partitions
>> > > > > > > > > > > >> should
>> > > > > > > > > > > >> > > > always
>> > > > > > > > > > > >> > > > > be doubled and the hash algorithm should
>> > module
>> > > > the
>> > > > > > > number
>> > > > > > > > > of
>> > > > > > > > > > > >> > > partitions.
>> > > > > > > > > > > >> > > > > SEP-5 should also allow partition
>> expansion of
>> > > all
>> > > > > > input
>> > > > > > > > > > systems
>> > > > > > > > > > > >> that
>> > > > > > > > > > > >> > > > meet
>> > > > > > > > > > > >> > > > > these two requirements. I have updated the
>> > > > > motivation
>> > > > > > > > > section
>> > > > > > > > > > to
>> > > > > > > > > > > >> > > clarify
>> > > > > > > > > > > >> > > > > this.
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> * I understand the partition mapping logic
>> > you
>> > > > have
>> > > > > > > > > proposed.
>> > > > > > > > > > > >> But I
>> > > > > > > > > > > >> > > > think
>> > > > > > > > > > > >> > > > >> the example explanation doesn't match the
>> > > > diagram.
>> > > > > In
>> > > > > > > the
>> > > > > > > > > > > >> diagram,
>> > > > > > > > > > > >> > > after
>> > > > > > > > > > > >> > > > >> expansion, partiion-0 and partition-1 are
>> > > > pointing
>> > > > > to
>> > > > > > > > > bucket
>> > > > > > > > > > 0
>> > > > > > > > > > > >> and
>> > > > > > > > > > > >> > > > >> partition-3 and partition-4 are pointing
>> to
>> > > > bucket
>> > > > > > 1. I
>> > > > > > > > > think
>> > > > > > > > > > > the
>> > > > > > > > > > > >> > > former
>> > > > > > > > > > > >> > > > >> has to be partition-0 and partition-2 and
>> the
>> > > > > latter,
>> > > > > > > is
>> > > > > > > > > > > >> partition-1
>> > > > > > > > > > > >> > > and
>> > > > > > > > > > > >> > > > >> partition-3. If I am wrong, please help me
>> > > > > understand
>> > > > > > > the
>> > > > > > > > > > logic
>> > > > > > > > > > > >> :)
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > Good catch. I will update the figure to fix
>> > this
>> > > > > > > problem.
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> * I don't know how partition expansion in
>> > Kafka
>> > > > > > works.
>> > > > > > > I
>> > > > > > > > am
>> > > > > > > > > > > >> familiar
>> > > > > > > > > > > >> > > > with
>> > > > > > > > > > > >> > > > >> how shard splitting happens in Kinesis -
>> > there
>> > > is
>> > > > > > > > > > hierarchical
>> > > > > > > > > > > >> > > relation
>> > > > > > > > > > > >> > > > >> between the parent and child shards. This
>> > way,
>> > > it
>> > > > > > will
>> > > > > > > > also
>> > > > > > > > > > > allow
>> > > > > > > > > > > >> > the
>> > > > > > > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only
>> > > > supports
>> > > > > > > > > partition
>> > > > > > > > > > > >> > > > "expansion",
>> > > > > > > > > > > >> > > > >> as opposed to "splits". Can you provide
>> some
>> > > > > context
>> > > > > > or
>> > > > > > > > > link
>> > > > > > > > > > > >> related
>> > > > > > > > > > > >> > > to
>> > > > > > > > > > > >> > > > >> how
>> > > > > > > > > > > >> > > > >> partition expansion works in Kafka?
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > I couldn't find any wiki on partition
>> > expansion
>> > > in
>> > > > > > > Kafka.
>> > > > > > > > > The
>> > > > > > > > > > > >> > partition
>> > > > > > > > > > > >> > > > > expansion logic in Kafka is very simply --
>> it
>> > > > simply
>> > > > > > > adds
>> > > > > > > > > new
>> > > > > > > > > > > >> > partition
>> > > > > > > > > > > >> > > > to
>> > > > > > > > > > > >> > > > > the existing topic. Is there specific
>> question
>> > > you
>> > > > > > have
>> > > > > > > > > > > regarding
>> > > > > > > > > > > >> > > > partition
>> > > > > > > > > > > >> > > > > expansion in Kafka?
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> * Are you only recommending that expansion
>> > can
>> > > be
>> > > > > > > > supported
>> > > > > > > > > > for
>> > > > > > > > > > > >> > samza
>> > > > > > > > > > > >> > > > jobs
>> > > > > > > > > > > >> > > > >> that use Kafka as input systems **and**
>> > > configure
>> > > > > the
>> > > > > > > > > > > SSPGrouper
>> > > > > > > > > > > >> as
>> > > > > > > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me
>> > like
>> > > > > this
>> > > > > > > only
>> > > > > > > > > > > applies
>> > > > > > > > > > > >> > for
>> > > > > > > > > > > >> > > > >> GroupByPartition. Please correct me if I
>> am
>> > > > wrong.
>> > > > > > What
>> > > > > > > > is
>> > > > > > > > > > the
>> > > > > > > > > > > >> > > > expectation
>> > > > > > > > > > > >> > > > >> for custom SSP Groupers?
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > The expansion can be supported for Samza
>> jobs
>> > if
>> > > > the
>> > > > > > > input
>> > > > > > > > > > > system
>> > > > > > > > > > > >> > meets
>> > > > > > > > > > > >> > > > > the operational requirement mentioned
>> above.
>> > It
>> > > > > > doesn't
>> > > > > > > > have
>> > > > > > > > > > to
>> > > > > > > > > > > >> use
>> > > > > > > > > > > >> > > Kafka
>> > > > > > > > > > > >> > > > > as input system.
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > The current proposal provided solution for
>> > jobs
>> > > > that
>> > > > > > > > > currently
>> > > > > > > > > > > use
>> > > > > > > > > > > >> > > > > GroupByPartition. The proposal can be
>> extended
>> > > to
>> > > > > > > support
>> > > > > > > > > jobs
>> > > > > > > > > > > >> that
>> > > > > > > > > > > >> > use
>> > > > > > > > > > > >> > > > > other grouper that are pre-defined in
>> Samza.
>> > The
>> > > > > > custom
>> > > > > > > > SSP
>> > > > > > > > > > > >> grouper
>> > > > > > > > > > > >> > > needs
>> > > > > > > > > > > >> > > > > to handle partition expansion similar to
>> how
>> > > > > > > > > > > >> > > GroupByPartitionFixedTaskNum
>> > > > > > > > > > > >> > > > > handles it and it is users' responsibility
>> to
>> > > > update
>> > > > > > > their
>> > > > > > > > > > > custom
>> > > > > > > > > > > >> > > grouper
>> > > > > > > > > > > >> > > > > implementation.
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> * Regarding storing SSP-to-Task
>> assignment to
>> > > > > > > coordinator
>> > > > > > > > > > > stream:
>> > > > > > > > > > > >> > > Today,
>> > > > > > > > > > > >> > > > >> the JobModel encapsulates the data model
>> in
>> > > samza
>> > > > > > which
>> > > > > > > > > also
>> > > > > > > > > > > >> > includes
>> > > > > > > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows
>> > the
>> > > > > > > > > > task-to-sspList
>> > > > > > > > > > > >> > > mapping.
>> > > > > > > > > > > >> > > > >> What is the reason for using a separate
>> > > > coordinator
>> > > > > > > > stream
>> > > > > > > > > > > >> message
>> > > > > > > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the
>> > JobModel
>> > > > > > itself
>> > > > > > > is
>> > > > > > > > > not
>> > > > > > > > > > > >> > > persisted
>> > > > > > > > > > > >> > > > in
>> > > > > > > > > > > >> > > > >> the coordinator stream today?  The reason
>> > > > locality
>> > > > > > > exists
>> > > > > > > > > > > >> outside of
>> > > > > > > > > > > >> > > the
>> > > > > > > > > > > >> > > > >> jobmodel is because *locality*
>> information is
>> > > > > written
>> > > > > > > by
>> > > > > > > > > each
>> > > > > > > > > > > >> > > container,
>> > > > > > > > > > > >> > > > >> where as it is consumed only by the leader
>> > > > > > > > > jobcoordinator/AM.
>> > > > > > > > > > > In
>> > > > > > > > > > > >> > this
>> > > > > > > > > > > >> > > > >> case,
>> > > > > > > > > > > >> > > > >> the writer of the mapping information and
>> the
>> > > > > reader
>> > > > > > is
>> > > > > > > > > still
>> > > > > > > > > > > the
>> > > > > > > > > > > >> > > leader
>> > > > > > > > > > > >> > > > >> jobcoordinator/AM. So, I want to
>> understand
>> > the
>> > > > > > > > motivation
>> > > > > > > > > > for
>> > > > > > > > > > > >> this
>> > > > > > > > > > > >> > > > >> choice.
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > Yes, the reason for using a separate
>> > coordinate
>> > > > > stream
>> > > > > > > > > message
>> > > > > > > > > > > is
>> > > > > > > > > > > >> > > because
>> > > > > > > > > > > >> > > > > the task-to-sspList mapping is not
>> currently
>> > > > > persisted
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > > >> > > coordinator
>> > > > > > > > > > > >> > > > > stream. We wouldn't need to create this new
>> > > stream
>> > > > > > > message
>> > > > > > > > > if
>> > > > > > > > > > > >> > JobModel
>> > > > > > > > > > > >> > > is
>> > > > > > > > > > > >> > > > > persisted. We need to persist the
>> > > task-to-sspList
>> > > > > > > mapping
>> > > > > > > > in
>> > > > > > > > > > the
>> > > > > > > > > > > >> > > > > coordinator stream so that the job can
>> derive
>> > > the
>> > > > > > > original
>> > > > > > > > > > > number
>> > > > > > > > > > > >> of
>> > > > > > > > > > > >> > > > > partitions of each input stream regardless
>> of
>> > > how
>> > > > > many
>> > > > > > > > times
>> > > > > > > > > > the
>> > > > > > > > > > > >> > > > partition
>> > > > > > > > > > > >> > > > > has expanded. Does this make sense?
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > I am not sure how this is related to the
>> > > locality
>> > > > > > > though.
>> > > > > > > > > Can
>> > > > > > > > > > > you
>> > > > > > > > > > > >> > > clarify
>> > > > > > > > > > > >> > > > > your question if I haven't answered your
>> > > question?
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > > Thanks!
>> > > > > > > > > > > >> > > > > Dong
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> Cheers!
>> > > > > > > > > > > >> > > > >> Navina
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin
>> <
>> > > > > > > > > > lindon...@gmail.com
>> > > > > > > > > > > >
>> > > > > > > > > > > >> > > wrote:
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >> > Hi all,
>> > > > > > > > > > > >> > > > >> >
>> > > > > > > > > > > >> > > > >> > We created SEP-5: Enable partition
>> > expansion
>> > > of
>> > > > > > input
>> > > > > > > > > > > streams.
>> > > > > > > > > > > >> > > Please
>> > > > > > > > > > > >> > > > >> find
>> > > > > > > > > > > >> > > > >> > the SEP wiki in the link
>> > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/
>> > > > > > > confluence/display/SAMZA/SEP-
>> > > > > > > > > > > >> > > > >> > 5%3A+Enable+partition+
>> > > > expansion+of+input+streams
>> > > > > > > > > > > >> > > > >> > .
>> > > > > > > > > > > >> > > > >> >
>> > > > > > > > > > > >> > > > >> > You feedback is appreciated!
>> > > > > > > > > > > >> > > > >> >
>> > > > > > > > > > > >> > > > >> > Thanks,
>> > > > > > > > > > > >> > > > >> > Dong
>> > > > > > > > > > > >> > > > >> >
>> > > > > > > > > > > >> > > > >>
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > > >
>> > > > > > > > > > > >> > > >
>> > > > > > > > > > > >> > >
>> > > > > > > > > > > >> >
>> > > > > > > > > > > >>
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>>
>>
>> --
>> We are hiring in Streams Infra (Kafka/Samza/Datastream) !!
>>
>
>

Reply via email to