Thanks for the reply Jacob. Please see my comment inline.

On Mon, Jun 12, 2017 at 7:51 PM, Jacob Maes <jacob.m...@gmail.com> wrote:

> >
> > - For users that need partition expansion of the input streams for
> stateful
> > job, they have a really big headache in the sense that Samza does not
> allow
> > partition expansion for stateful job. SEP-5 addresses this headache for
> > them.
> > You are right that SEP-5 requires user to understand and enforce
> > limitations across organizations. But it is still much better than not
> > allowing user to expansion partition for stateful jobs at all, right?
> Did I
> > miss something here?
>
> I guess this one is a matter of perspective.
>
> One argument is that if the system supports one case, it's better than none
> because there is one less scenario in which the system does the wrong
> thing.
>
> The counter argument is for uniform and consistent behavior, which is easy
> for users to understand and properly leverage.
>
> Specifically, I'd argue that the current rule is very simple: "you cannot
> repartition inputs on a stateful job, so you must over-partition the
> initial implementation". To me, while that rule is not ideal, its
> simplicity is better that introducing a new solution that has a bunch of
> caveats, any one of which could be missed. If any one of the assumptions in
> this SEP design are violated, the job would behave incorrectly. That puts a
> lot more burden on the users than the simpler rule.
>

I agree that we have different perspective here. It is true that user would
mess up their job if they used this feature in a wrong way, i.e. violate
the assumption made in SEP-5. On the other hand, I think there is always a
way for user to mess up their job if they configure the Samza job
incorrectly. I also think the assumption made in this SEP is not
particularly harder to understand than other existing configs in Samza.

The answer to this can be subjective. I would love to hear perspective from
other developers on this issue.


>
> That's why I mentioned a few alternatives that, while more complex to
> implement, would provide a more consistent behavior with simple rules for
> the users.
>

I am open to discuss alternative solutions that can address the the problem
in a better manner. I am not opposed to complexity as long as it gives us
good long term benefits.

Here are my current concern with the three alternatives you described
earlier:

- The first alternative requires support from input system which is
currently not available. It will limit the usage of partition expansion to
only systems that support such interface. And it is not guaranteed that we
can persuade the developer of the input system to add this interface. This
is not desirable for Samza in the long term.

- I can not comment on the second alternative because I don't understand
how it reshuffles all existing changelog data. We can discuss more if there
is more specific detail. My gut feel is that this will be complex and
carries performance overhead.

- The third alternative requires performance overhead. Given that user can
already use this solution to enable partition expansion, maybe Samza
developers can provide more input as to why we are not doing it by default.
My gut feel is that it carries considerable performance overhead and
increases the cost-to-serve Samze job (e.g. disk usage), which may make it
undesirable in the long term.



>
> Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > thFixedTaskNum
> > as well. If there is more grouper classes needed in the future, we can
> > solve this problem cleanly without new config. Given the
> > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will
> throw
> > exception if and only if newGrouperClass is an instance of
> > previousGrouperClass.
> > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > GroupBySystemStreamPartition
> > and GroupByPartitionWithFixedTaskNum should extend GroupByPartition.
> Does
> > this address your concern?
>
> Sounds workable, thanks.
>
> >
> > Can
> > you be more specific why Partition-to-task mapping is not meaningful
> > without
> > some definition of the key-to-partition assignments and why it is
> > incomplete and misleading?
>
>  A partition is (in my naive interpretation) an independent queue for
> messages of a particular key set. It is not the *identity* of the partition
> that determine the contents of the associated task's local state. Rather it
> is the *contents* of the partition that affect the task's state. A
> partiton-to-task mapping only captures an identity relationship:
> partition1->task1. Without the assumptions of this SEP, this is
> insufficient to determine the assignment of keys to tasks, which is what
> really matters. Therefore, any future feature that utilizes this mapping
> without accounting for the assumptions of this SEP is likely to
> malfunction.
>
>
I am not sure it is true that "any future feature that utilizes this
mapping without accounting for the assumptions of this SEP is likely to
malfunction". Suppose we allow user to specify new-to-old-partition
mapping, then we can use the partition-to-task mapping correctly without
replying on the assumption in this SEP, right?


>
> On Mon, Jun 12, 2017 at 5:09 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hey Jacob,
> >
> > Thanks for the explanation. It seems that your biggest concern is with
> the
> > generality of the proposal. Let me try to address this and other comments
> > below.
> >
> > 1) ... it will cause headaches for Samza users ...
> >
> > I am not sure I understand why this proposal causes headache for Samza
> > users. Here is the impact of the SEP-5 on users:
> >
> > - For users that do not need partition expansion of the input stream,
> they
> > can use Samza without change change in code/binary/config. Thus there is
> no
> > headache for them.
> >
> > - For users that need partition expansion of the input streams for
> > stateless job, they currently need to manually reboot their Samza job in
> > order to let Samza consume the new partitions created for the stream.
> SEP-5
> > actually reduced their headache by allowing Samza to automatically detect
> > and consume new partitions.
> >
> > - For users that need partition expansion of the input streams for
> stateful
> > job, they have a really big headache in the sense that Samza does not
> allow
> > partition expansion for stateful job. SEP-5 addresses this headache for
> > them.
> >
> > You are right that SEP-5 requires user to understand and enforce
> > limitations across organizations. But it is still much better than not
> > allowing user to expansion partition for stateful jobs at all, right?
> Did I
> > miss something here?
> >
> > 2) ... Separate orgs are often difficult to coordinate and a system which
> > depends on such significant process/coordination is too fragile for my
> > taste ..
> >
> > This is true. Ideally we want a system that is fully self-serving. I
> think
> > this is a long term goal for Samza. Still, for the reasons described
> above,
> > I think something is better than nothing. I am open to alternative design
> > that can support partition expansion for stateful jobs without requiring
> > coordination.
> >
> > 3) There is currently no supported way of sharing state among the tasks
> of
> > a container.  Each task has its own isolated store and that logical
> > isolation is the primary thing that enables Samza jobs to scale with a
> > simple container count change. My feeling is that we should not change
> > this without
> > good reason.
> >
> > I see your point. I will remove this sentence from the motivation
> section.
> > This won't have any impact on the design of the SEP-5. Does this address
> > the problem?
> >
> > 4) With the current proposal, we'd also need a similar check for
> > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> > groupers
> > were later added with both these modes, we'd probably need to add those
> > too. It might be easier and cleaner to add a config to ignore that check
> > temporarily. Down side is that it further complicates the Samza config,
> > which is already huge. Thoughts?
> >
> > Yes, we need a similar check for GroupBySystemStreamPartitionWi
> > thFixedTaskNum
> > as well. If there is more grouper classes needed in the future, we can
> > solve this problem cleanly without new config. Given the
> > previousGrouperClass and newGrouperClass, KafkaCheckpointLogKey will
> throw
> > exception if and only if newGrouperClass is an instance of
> > previousGrouperClass.
> > GroupBySystemStreamPartitionWithFixedTaskNum should extend
> > GroupBySystemStreamPartition
> > and GroupByPartitionWithFixedTaskNum should extend GroupByPartition.
> Does
> > this address your concern?
> >
> > 5) The task-to-container and container-to-host mappings are both
> meaningful
> > in context of the JobModel. Partition-to-task mapping is not meaningful
> > without
> > some definition of the key-to-partition assignments. It's incomplete
> > information and therefore misleading. I think it only makes sense to use
> > this mapping if we adopt a solution wherein Samza also knows the
> partition
> > key assignment.
> >
> > Partition-to-task is currently explicitly passed from job coordinator to
> > each task as part of the job model to tell tasks which partitions to
> > consume from. I think we can store some definition of the
> key-to-partition
> > assignments if Samza decides to get and use this information in the
> > future. Can
> > you be more specific why Partition-to-task mapping is not meaningful
> > without
> > some definition of the key-to-partition assignments and why it is
> > incomplete and misleading?
> >
> >
> > Thanks,
> > Dong
> >
> > On Mon, Jun 12, 2017 at 3:54 PM, Jacob Maes <jacob.m...@gmail.com>
> wrote:
> >
> > > Hey Dong,
> > >
> > > I'm opposed (or a +0, at best) to this limited, Kafka-specific
> solution.
> > I
> > > understand that the proposal is relatively simple to implement, but I
> > think
> > > it will cause headaches for Samza users. They will not only have to
> > > understand all the limitations (increase only, double partitions only,
> > > partition using hash+modulo, etc) of this approach, but enforcing these
> > > limitations can be a major problem, especially when the Samza jobs and
> > > message brokers are managed by separate orgs in a company. Separate
> orgs
> > > are often difficult to coordinate and a system which depends on such
> > > significant process/coordination is too fragile for my taste.
> > >
> > > That said, I realize that my opinion is just one of many in the broader
> > > community which may feel differently, so let me respond to some of the
> > > other items in the discussion so we can clear them up:
> > >
> > > The task-to-container assignment matters because if the correlated
> tasks
> > > > (i.e. tasks that consume messages with the same key) needs to be in
> the
> > > > same container so that they can share the same key/value local store
> on
> > > the
> > > > same physical machine.
> > >
> > > There is currently no supported way of sharing state among the tasks
> of a
> > > container.  Each task has its own isolated store and that logical
> > isolation
> > > is the primary thing that enables Samza jobs to scale with a simple
> > > container count change. My feeling is that we should not change this
> > > without good reason.
> > >
> > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such
> > that
> > > > exception will not be thrown if new grouper is
> > > > GroupByPartitionWithFixedTaskNum and old grouper is
> GroupByPartition.
> > > Does
> > > > this look reasonable?
> > >
> > > With the current proposal, we'd also need a similar check for
> > > GroupBySystemStreamPartitionWithFixedTaskNum as well. And if any other
> > > groupers were later added with both these modes, we'd probably need to
> > add
> > > those too. It might be easier and cleaner to add a config to ignore
> that
> > > check temporarily. Down side is that it further complicates the Samza
> > > config, which is already huge. Thoughts?
> > >
> > > I think storing the previous task-to-partition mapping is more general
> > than
> > > > storing the partition count of all topics for the following reasons:
> > > > - Samza already stores the task-to-container mapping and
> > > container-to-host
> > > > mapping in the coordinator stream. It seems consistent to also store
> > the
> > > > partition-to-task mapping. And this information may be useful for
> other
> > > > use-case such as debugging.
> > > > - By having the new interface take the previous task-to-partition
> > > > assignment instead of a topic-to-partition-count mapping as new
> > > parameter,
> > > > we can potentially have grouper implementation to support other types
> > of
> > > > input systems.
> > > > - It is sightly simpler to store the task-to-partition assignment
> > because
> > > > we don't need to know whether this is the first time a job is started
> > or
> > > > not. On the other hand, you can write topic-to-partition-count
> mapping
> > to
> > > > the coordinator stream only if this is the first time the job is run
> > >
> > > The task-to-container and container-to-host mappings are both
> meaningful
> > in
> > > context of the JobModel. Partition-to-task mapping is not meaningful
> > > without some definition of the key-to-partition assignments. It's
> > > incomplete information and therefore misleading. I think it only makes
> > > sense to use this mapping if we adopt a solution wherein Samza also
> knows
> > > the partition key assignment.
> > >
> > > -Jake
> > >
> > > On Tue, Jun 6, 2017 at 11:06 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Jacob,
> > > >
> > > > Thanks for taking time to review the SEP.
> > > >
> > > > I agree with you and Navina that the current SEP doesn't provide
> > support
> > > to
> > > > arbitrary input systems and it doesn't support partition shrink. I
> > think
> > > > the scope of this SEP is to support partition expansion for Kafka
> (the
> > > most
> > > > widely used input system of Samza) and keep the door open for
> partition
> > > > support of various input systems. The current design can support any
> > > system
> > > > that meets the two operational requirement specified in the doc.
> > > >
> > > > While it is possible to support more types of input systems, it will
> > > likely
> > > > add more complexity to the design. For example, the first alternative
> > > > solution from you requires broker-side support to negotiate hash
> > > algorithm.
> > > > The second alternative solution requires changelog partition
> reshuffle
> > > > which carries its own design complexity and performance overhead.
> There
> > > is
> > > > tradeoff between the generality and the complexity among these
> > choices. I
> > > > like the current design because it is simple and addresses a big
> usage
> > > > scenario for us. We can add more complexity to generalize the design
> if
> > > it
> > > > enables important use-case. Does this sound reasonable?
> > > >
> > > > Note that the "Rejected Alternative" section also mentions the
> > > possibility
> > > > of supporting a wider range of input systems by allowing user to
> > specify
> > > > the new-partition to old-partition mapping. We are not doing it
> because
> > > 1)
> > > > we may have better understanding of the design after we have a
> specific
> > > > second input system to support 2) the current design can be extended
> to
> > > > support general input systems. I think similar argument can be
> applied
> > > > explain why we don't have to support general input systems using the
> > > > potentially-good alternatives you mentioned.
> > > >
> > > > I hope SEP-5 can be an important first-step towards supporting
> > partition
> > > > expansion of any input system.
> > > >
> > > > To answer your questions about the current proposal:
> > > >
> > > > >1. "An alternative solution is to allow task number to increase
> after
> > > > >partition expansion and uses a proper task-to-container assignment
> to
> > > make
> > > > >sure the Samza output is correct." What does the container have to
> do
> > > with
> > > > >stateful processing or output in general?
> > > >
> > > > The task-to-container assignment matters because if the correlated
> > tasks
> > > > (i.e. tasks that consume messages with the same key) needs to be in
> the
> > > > same container so that they can share the same key/value local store
> on
> > > the
> > > > same physical machine.
> > > >
> > > > >2. When you use "Join" as an example, you basically mean multiple
> > > > >co-partitioned streams, right? This is opposed to multiple,
> > > > >independently-partitioned streams or a single stream. Would be nice
> to
> > > > >formulate the proposal in these more general terms.
> > > >
> > > > I thought "join" is a commonly used to refer to the join opeartion
> with
> > > > co-partitioned stream but I may be wrong. I have updated the wiki to
> > > > explicitly mention "co-partitioned stream". Does this look better
> now?
> > > >
> > > > >3. When switching SSP groupers, how will the users avoid the
> > > > >org.apache.samza.checkpoint.kafka.DifferingSystemStreamPartition
> > > > GrouperFactoryValues
> > > > >exception?
> > > >
> > > > I think we can hardcode new logic in KafkaCheckpointLogKey.scala such
> > > that
> > > > exception will not be thrown if new grouper is
> > > > GroupByPartitionWithFixedTaskNum and old grouper is
> GroupByPartition.
> > > Does
> > > > this look reasonable?
> > > >
> > > > >4. Partition to task assignment is meaningless without key to
> > partition
> > > > >mapping. The real semantics are captured in the external requirement
> > for
> > > > >partitioning via hash+modulo. But in that case, iiuc, only the
> > partition
> > > > >count matters. So why not just store the original partition count
> > rather
> > > > >than the whole mapping?
> > > >
> > > > I think storing the previous task-to-partition mapping is more
> general
> > > than
> > > > storing the partition count of all topics for the following reasons:
> > > >
> > > > - Samza already stores the task-to-container mapping and
> > > container-to-host
> > > > mapping in the coordinator stream. It seems consistent to also store
> > the
> > > > partition-to-task mapping. And this information may be useful for
> other
> > > > use-case such as debugging.
> > > >
> > > > - By having the new interface take the previous task-to-partition
> > > > assignment instead of a topic-to-partition-count mapping as new
> > > parameter,
> > > > we can potentially have grouper implementation to support other types
> > of
> > > > input systems.
> > > >
> > > > - It is sightly simpler to store the task-to-partition assignment
> > because
> > > > we don't need to know whether this is the first time a job is started
> > or
> > > > not. On the other hand, you can write topic-to-partition-count
> mapping
> > to
> > > > the coordinator stream only if this is the first time the job is run
> > > >
> > > > Thanks,
> > > > Dong
> > > >
> > > > On Mon, Jun 5, 2017 at 11:17 AM, Jacob Maes <jacob.m...@gmail.com>
> > > wrote:
> > > >
> > > > > Hey Dong,
> > > > >
> > > > > Thanks for the SEP. Supporting partition changes is critically
> > > important
> > > > > for stateful Samza jobs, so it's great to see some ideas on that
> > front!
> > > > >
> > > > > Sorry for the late feedback, but I have a few thoughts to
> contribute.
> > > > >
> > > > > Big +1 on Navina's comment:
> > > > >
> > > > > > My biggest gripe with this SEP is that it seems like a
> tailor-made
> > > > > > solution
> > > > > > that relies on the semantics of the Kafka system and yet, we are
> > > trying
> > > > > to
> > > > > > masquerade that as operational requirements for other systems
> > > > interacting
> > > > > > with Samza. (Not to say that this is the first time such a choice
> > is
> > > > > being
> > > > > > made in the Samza design). I am not seeing how this can a
> "general"
> > > > > > solution for all input systems. That's my two cents. I would like
> > to
> > > > hear
> > > > > > alternative points of view for this from other devs.
> > > > >
> > > > >
> > > > > Two examples of this:
> > > > > 1. This is mostly a hypothetical, but some message brokers may use
> > key
> > > > > range assignment rather than hash+modulo.
> > > > > 2. Kafka can't reduce the number of partitions, but it can happen
> on
> > > > other
> > > > > systems. For example, it may be cheaper to reduce the number of
> > > > partitions
> > > > > on a hosted service where the cost model depends on the number of
> > > > > partitions/shards.
> > > > >
> > > > > It seems to me that a solution which doesn't depend on partition
> key
> > > > > assignment in the message broker. Here are a few alternatives that
> > > > weren't
> > > > > discussed and I think should be considered:
> > > > >
> > > > > Alternatives in order of increasing preference:
> > > > > 1. Samza manages the partition hash (via some new contract with the
> > > > > brokers) and guarantees correct routing of keys among the new
> > > partitions.
> > > > > 2. Samza detects a task count change, creates a new changelog with
> > > > correct
> > > > > partitions, and *somehow* reshuffles all existing changelog data
> into
> > > the
> > > > > new topic and then uses the new topic from then on. (doesn't work
> > > without
> > > > > changelog, but in that case durability isn't paramount, so we can
> > just
> > > > > wipe)
> > > > > 3. Use RPC in between stages and samza fully manages key assignment
> > > among
> > > > > tasks. No on-disk topic data to clean up. Mandatory repartitioning
> in
> > > the
> > > > > first stage to pre-scaled tasks in next stage.
> > > > > 4. Combined 2-3 solution
> > > > >
> > > > > Finally, some questions about the current proposal:
> > > > > 1. "An alternative solution is to allow task number to increase
> after
> > > > > partition expansion and uses a proper task-to-container assignment
> to
> > > > make
> > > > > sure the Samza output is correct." What does the container have to
> do
> > > > with
> > > > > stateful processing or output in general?
> > > > > 2. When you use "Join" as an example, you basically mean multiple
> > > > > co-partitioned streams, right? This is opposed to multiple,
> > > > > independently-partitioned streams or a single stream. Would be nice
> > to
> > > > > formulate the proposal in these more general terms.
> > > > > 3. When switching SSP groupers, how will the users avoid the
> > > > > org.apache.samza.checkpoint.kafka.DifferingSystemStreamParti
> > > > > tionGrouperFactoryValues
> > > > > exception?
> > > > > 4. Partition to task assignment is meaningless without key to
> > partition
> > > > > mapping. The real semantics are captured in the external
> requirement
> > > for
> > > > > partitioning via hash+modulo. But in that case, iiuc, only the
> > > partition
> > > > > count matters. So why not just store the original partition count
> > > rather
> > > > > than the whole mapping?
> > > > >
> > > > > -Jake
> > > > >
> > > > > On Sun, Jun 4, 2017 at 12:32 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > > >
> > > > > > Hey Yi, Navina,
> > > > > >
> > > > > > I have updated the SEP-5 document based on our discussion. The
> > > > difference
> > > > > > can be found here
> > > > > > <https://cwiki.apache.org/confluence/pages/
> > diffpagesbyversion.action
> > > ?
> > > > > > pageId=70255476&selectedPageVersions=14&selectedPageVersions
> =15>.
> > > > > > Here is the summary of changes:
> > > > > >
> > > > > > - Add new interface that extends the existing interface
> > > > > > SystemStreamPartitionGrouper. Newly-added grouper class should
> > > > implement
> > > > > > this interface.
> > > > > > - Explained in the Rejected Alternative Section why we don't add
> > new
> > > > > method
> > > > > > in the existing interface
> > > > > > - Explained in the Rejected Alternative Section why we don't
> > > > config/class
> > > > > > for user to specify new-partition to old-partition mapping.
> > > > > >
> > > > > > Can you take another look at the proposal and let me know if
> there
> > is
> > > > any
> > > > > > concern?
> > > > > >
> > > > > > Cheers,
> > > > > > Dong
> > > > > >
> > > > > >
> > > > > > On Thu, Jun 1, 2017 at 12:58 AM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > > Hey Yi,
> > > > > > >
> > > > > > > Thanks much for the comment. I have updated the doc to address
> > all
> > > > your
> > > > > > > comments except the one related to the interface. I am not
> sure I
> > > > > > > understand your suggestion of the new interface. Will discuss
> > > > tomorrow.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Dong
> > > > > > >
> > > > > > > On Wed, May 31, 2017 at 4:29 PM, Yi Pan <nickpa...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > >> Hi, Don,
> > > > > > >>
> > > > > > >> Thanks for the detailed design doc for a long-waited feature
> in
> > > > Samza!
> > > > > > >> Really appreciate it! I did a quick pass and have the
> following
> > > > > > comments:
> > > > > > >>
> > > > > > >> - minor: "limit the maximum size of partition" ==> "limit the
> > > > maximum
> > > > > > size
> > > > > > >> of each partition"
> > > > > > >> - "However, Samza currently is not able to handle partition
> > > > expansion
> > > > > of
> > > > > > >> the input streams"==>better point out "for stateful jobs". For
> > > > > stateless
> > > > > > >> jobs, simply bouncing the job now can pick up the new
> > partitions.
> > > > > > >> - "it is possible (e.g. with Kafka) that messages with a given
> > key
> > > > > > exists
> > > > > > >> in both partition 1 an 3. Because GroupByPartition will assign
> > > > > > partition 1
> > > > > > >> and 3 to different tasks, messages with the same key may be
> > > handled
> > > > by
> > > > > > >> different task/container/process and their state will be
> stored
> > in
> > > > > > >> different changelog partition." The problem statement is not
> > super
> > > > > clear
> > > > > > >> here. The issues with stateful jobs is: after GroupByPartition
> > > > assign
> > > > > > >> partition 1 and 3 to different tasks, the new task handling
> > > > partition
> > > > > 3
> > > > > > >> does not have the previous state to resume the work. e.g. a
> > > page-key
> > > > > > based
> > > > > > >> counter would start from 0 in the new task for a specific key,
> > > > instead
> > > > > > of
> > > > > > >> resuming the previous count 50 held by task 1.
> > > > > > >> - minor rewording: "the first solution in this doc" ==> "the
> > > > solution
> > > > > > >> proposed in this doc"
> > > > > > >> - "Thus additional development work is needed in Kafka to meet
> > > this
> > > > > > >> requirement" It would be good to link to a KIP if and when it
> > > exists
> > > > > > >> - Instead of touching/deprecating the interface
> > > > > > >> SystemStreamPartitionGrouper, I would recommend to have a
> > > different
> > > > > > >> implementation class of the interface, which in the
> constructor
> > of
> > > > the
> > > > > > >> grouper, takes two parameters: a) the previous task number
> read
> > > from
> > > > > the
> > > > > > >> coordinator stream; b) the configured new-partition to
> > > old-partition
> > > > > > >> mapping policy. Then, the grouper's interface method stays the
> > > same
> > > > > and
> > > > > > >> the
> > > > > > >> behavior of the grouper is more configurable which is good to
> > > > support
> > > > > a
> > > > > > >> broader set of use cases in addition to Kafka's built-in
> > partition
> > > > > > >> expansion policies.
> > > > > > >> - Minor renaming suggestion to the new grouper class names:
> > > > > > >> GroupByPartitionWithFixedTaskNum
> > > > > > >> and GroupBySystemStreamPartitionWithFixedTaskNum
> > > > > > >>
> > > > > > >> Thanks!
> > > > > > >>
> > > > > > >> - Yi
> > > > > > >>
> > > > > > >> On Wed, May 31, 2017 at 10:33 AM, Dong Lin <
> lindon...@gmail.com
> > >
> > > > > wrote:
> > > > > > >>
> > > > > > >> > Hey Navina,
> > > > > > >> >
> > > > > > >> > Thanks much for the comment. Please see my response below.
> > > > > > >> >
> > > > > > >> > Regarding your biggest gripe with the SEP, I personally
> think
> > > the
> > > > > > >> > operational requirement proposed in the KIP are pretty
> general
> > > and
> > > > > > >> could be
> > > > > > >> > easily enforced by other systems. The reason is that the
> > module
> > > > > > >> operation
> > > > > > >> > is pretty standard and the default option when we choose
> > > > partition.
> > > > > > And
> > > > > > >> > usually the underlying system allows user to select
> arbitrary
> > > > > > partition
> > > > > > >> > number if it supports partition expansion. Do you know any
> > > system
> > > > > that
> > > > > > >> does
> > > > > > >> > not meet these two requirement?
> > > > > > >> >
> > > > > > >> > Regarding your comment of the Motivation section, I renamed
> > the
> > > > > first
> > > > > > >> > section as "Problem and Goal" and specified that "*The goal
> of
> > > > this
> > > > > > >> > proposal is to enable partition expansion of the input
> > > > streams*.". I
> > > > > > >> also
> > > > > > >> > put a sentence at the end of the Motivation section that
> "*The
> > > > > feature
> > > > > > >> of
> > > > > > >> > task expansion is out of the scope of this proposal and will
> > be
> > > > > > >> addressed
> > > > > > >> > in a future SEP*". The second paragraph in the Motivation
> > > section
> > > > is
> > > > > > >> mainly
> > > > > > >> > used to explain the thinking process that we have gone
> > through,
> > > > what
> > > > > > >> other
> > > > > > >> > alternative we have considered, and we plan to do in Samza
> in
> > > the
> > > > > nex
> > > > > > >> step.
> > > > > > >> >
> > > > > > >> > To answer your question why increasing the partition number
> > will
> > > > > > >> increase
> > > > > > >> > the throughput of the kafka consumer in the container, Kafka
> > > > > consumer
> > > > > > >> can
> > > > > > >> > potentially fetch more data in one FetchResponse with more
> > > > > partitions
> > > > > > in
> > > > > > >> > the FetchRequest. This is because we limit the maximum
> amount
> > of
> > > > > data
> > > > > > >> that
> > > > > > >> > can be fetch for a given partition in the FetchResponse.
> This
> > by
> > > > > > >> default is
> > > > > > >> > set to 1 MB. And there is reason that we can not arbitrarily
> > > bump
> > > > up
> > > > > > >> this
> > > > > > >> > limit.
> > > > > > >> >
> > > > > > >> > To answer your question how partition expansion in Kafka
> > impacts
> > > > the
> > > > > > >> > clients, Kafka consumer is able to automatically detect new
> > > > > partition
> > > > > > of
> > > > > > >> > the topic and reassign all (both old and new) partitions
> > across
> > > > > > >> consumers
> > > > > > >> > in the consumer group IF you tell consumer the topic to be
> > > > > subscribed.
> > > > > > >> But
> > > > > > >> > consumer in Samza's container uses another way of
> > subscription.
> > > > > > Instead
> > > > > > >> of
> > > > > > >> > subscribing to the topic, the consumer in Samza's container
> > > > > subscribes
> > > > > > >> to
> > > > > > >> > the specific partitions of the topic. In this case, if new
> > > > > partitions
> > > > > > >> have
> > > > > > >> > been added, Samza will need to explicitly subscribe to the
> new
> > > > > > >> partitions
> > > > > > >> > of the topic. The "Handle partition expansion while tasks
> are
> > > > > running"
> > > > > > >> > section in the SEP addresses this issue in Samza -- it
> > > > recalculates
> > > > > > the
> > > > > > >> job
> > > > > > >> > model and restart container so that consumer can subscribe
> to
> > > the
> > > > > new
> > > > > > >> > partitions.
> > > > > > >> >
> > > > > > >> > I will ask other dev to take a look at the proposal. I will
> > > start
> > > > > the
> > > > > > >> > voting thread tomorrow if there is no further concern with
> the
> > > > SEP.
> > > > > > >> >
> > > > > > >> > Thanks!
> > > > > > >> > Dong
> > > > > > >> >
> > > > > > >> >
> > > > > > >> > On Wed, May 31, 2017 at 12:01 AM, Navina Ramesh (Apache) <
> > > > > > >> > nav...@apache.org>
> > > > > > >> > wrote:
> > > > > > >> >
> > > > > > >> > > Hey Dong,
> > > > > > >> > >
> > > > > > >> > > >  I have updated the motivation section to clarify this.
> > > > > > >> > >
> > > > > > >> > > Thanks for updating the motivation. Couple of notes here:
> > > > > > >> > >
> > > > > > >> > > 1.
> > > > > > >> > > > "The motivation of increasing partition number of Kafka
> > > topic
> > > > > > >> includes
> > > > > > >> > 1)
> > > > > > >> > > limit the maximum size of a partition in order to improve
> > > broker
> > > > > > >> > > performance and 2) increase throughput of Kafka consumer
> in
> > > the
> > > > > > Samza
> > > > > > >> > > container."
> > > > > > >> > >
> > > > > > >> > > It's unclear to me how increasing the partition number
> will
> > > > > increase
> > > > > > >> the
> > > > > > >> > > throughput of the kafka consumer in the container?
> > > > Theoretically,
> > > > > > you
> > > > > > >> > will
> > > > > > >> > > still be consuming the same amount of data in the
> container,
> > > > > > >> irrespective
> > > > > > >> > > of whether it is coming from one partition or more than
> one
> > > > > expanded
> > > > > > >> > > partitions. Can you please explain it for me here, what
> you
> > > mean
> > > > > by
> > > > > > >> that?
> > > > > > >> > >
> > > > > > >> > > 2. I believe the second paragraph under motivation is
> simply
> > > > > talking
> > > > > > >> > about
> > > > > > >> > > the scope of the current SEP. It will be easier to read if
> > > what
> > > > > > >> solution
> > > > > > >> > is
> > > > > > >> > > included in this SEP and what is left out as not in scope.
> > > (for
> > > > > > >> example,
> > > > > > >> > > expansions for stateful jobs is supported or not).
> > > > > > >> > >
> > > > > > >> > > > We need to persist the task-to-sspList mapping in the
> > > > > > >> > > coordinator stream so that the job can derive the original
> > > > number
> > > > > of
> > > > > > >> > > partitions of each input stream regardless of how many
> times
> > > the
> > > > > > >> > partition
> > > > > > >> > > has expanded. Does this make sense?
> > > > > > >> > >
> > > > > > >> > > Yes. It does!
> > > > > > >> > >
> > > > > > >> > > > I am not sure how this is related to the locality
> though.
> > > Can
> > > > > you
> > > > > > >> > clarify
> > > > > > >> > > your question if I haven't answered your question?
> > > > > > >> > >
> > > > > > >> > > It's not related. I just meant to give an example of yet
> > > another
> > > > > > >> > > coordinator message that is persisted. Your ssp-to-task
> > > mapping
> > > > is
> > > > > > >> > > following a similar pattern for persisting. Just wanted to
> > > > clarify
> > > > > > >> that.
> > > > > > >> > >
> > > > > > >> > > > Can you let me know if this, together with the answers
> in
> > > the
> > > > > > >> previous
> > > > > > >> > > email, addresses all your questions?
> > > > > > >> > >
> > > > > > >> > > Yes. I believe you have addressed most of my questions.
> > Thanks
> > > > for
> > > > > > >> taking
> > > > > > >> > > time to do that.
> > > > > > >> > >
> > > > > > >> > > > Is there specific question you have regarding partition
> > > > > > >> > > expansion in Kafka?
> > > > > > >> > >
> > > > > > >> > > I guess my questions are on how partition expansion in
> Kafka
> > > > > impacts
> > > > > > >> the
> > > > > > >> > > clients. Iiuc, partition expansions are done manually in
> > Kafka
> > > > > based
> > > > > > >> on
> > > > > > >> > the
> > > > > > >> > > bytes-in rate of the partition. Do the existing kafka
> > clients
> > > > > handle
> > > > > > >> this
> > > > > > >> > > expansion automatically? if yes, how does it work? If not,
> > are
> > > > > there
> > > > > > >> > plans
> > > > > > >> > > to support it in the future?
> > > > > > >> > >
> > > > > > >> > > > Thus user's job should not need to bootstrap key/value
> > store
> > > > > from
> > > > > > >> the
> > > > > > >> > > changelog topic.
> > > > > > >> > >
> > > > > > >> > > Why is this discussion relevant here? Key/value store /
> > > > changelog
> > > > > > >> topic
> > > > > > >> > > partition is scoped with the context of a task. Since we
> are
> > > not
> > > > > > >> changing
> > > > > > >> > > the number of tasks, I don't think it is required to
> mention
> > > it
> > > > > > here.
> > > > > > >> > >
> > > > > > >> > > > The new method takes the SystemStreamPartition-to-Task
> > > > > assignment
> > > > > > >> from
> > > > > > >> > > the previous job model which can be read from the
> > coordinator
> > > > > > stream.
> > > > > > >> > >
> > > > > > >> > > Jobmodel is currently not persisted to coordinator stream.
> > In
> > > > your
> > > > > > >> > design,
> > > > > > >> > > you talk about writing separate coordinator messages for
> > > > > ssp-to-task
> > > > > > >> > > assignments. Hence, please correct this statement. It is
> > kind
> > > of
> > > > > > >> > misleading
> > > > > > >> > > to the reader.
> > > > > > >> > >
> > > > > > >> > > My biggest gripe with this SEP is that it seems like a
> > > > tailor-made
> > > > > > >> > solution
> > > > > > >> > > that relies on the semantics of the Kafka system and yet,
> we
> > > are
> > > > > > >> trying
> > > > > > >> > to
> > > > > > >> > > masquerade that as operational requirements for other
> > systems
> > > > > > >> interacting
> > > > > > >> > > with Samza. (Not to say that this is the first time such a
> > > > choice
> > > > > is
> > > > > > >> > being
> > > > > > >> > > made in the Samza design). I am not seeing how this can a
> > > > > "general"
> > > > > > >> > > solution for all input systems. That's my two cents. I
> would
> > > > like
> > > > > to
> > > > > > >> hear
> > > > > > >> > > alternative points of view for this from other devs.
> > > > > > >> > >
> > > > > > >> > > Please make sure you have enough eyes on this SEP. If you
> > do,
> > > > > please
> > > > > > >> > start
> > > > > > >> > > a VOTE thread to approve this SEP.
> > > > > > >> > >
> > > > > > >> > > Thanks!
> > > > > > >> > > Navina
> > > > > > >> > >
> > > > > > >> > >
> > > > > > >> > > On Mon, May 29, 2017 at 12:32 AM, Dong Lin <
> > > lindon...@gmail.com
> > > > >
> > > > > > >> wrote:
> > > > > > >> > >
> > > > > > >> > > > Hey Navina,
> > > > > > >> > > >
> > > > > > >> > > > I have updated the wiki based on your suggestion. More
> > > > > > >> specifically, I
> > > > > > >> > > have
> > > > > > >> > > > made the following changes:
> > > > > > >> > > >
> > > > > > >> > > > - Improved Problem section and Motivation section to
> > > describe
> > > > > why
> > > > > > we
> > > > > > >> > use
> > > > > > >> > > > the solution in this proposal instead of tackling the
> > > problem
> > > > of
> > > > > > >> task
> > > > > > >> > > > expansion directly.
> > > > > > >> > > >
> > > > > > >> > > > - Illustrate the design in a way that doesn't bind to
> > Kafka.
> > > > > Kafka
> > > > > > >> is
> > > > > > >> > > only
> > > > > > >> > > > used as example to illustrate why we want to expand
> > > partition
> > > > > > >> expansion
> > > > > > >> > > and
> > > > > > >> > > > whether the operational requirement can be supported
> when
> > > > Kafka
> > > > > is
> > > > > > >> used
> > > > > > >> > > as
> > > > > > >> > > > the input system. Note that the proposed solution should
> > > work
> > > > > for
> > > > > > >> any
> > > > > > >> > > input
> > > > > > >> > > > system that meets the operational requirement described
> in
> > > the
> > > > > > wiki.
> > > > > > >> > > >
> > > > > > >> > > > - Fixed the problem in the figure.
> > > > > > >> > > >
> > > > > > >> > > > - Added a new class GroupBySystemStreamPartitionFi
> > > xedTaskNum
> > > > to
> > > > > > the
> > > > > > >> > > wiki.
> > > > > > >> > > > Together with GroupByPartitionFixedTaskNum, it should
> > ensure
> > > > > that
> > > > > > we
> > > > > > >> > > have a
> > > > > > >> > > > solution to enable partition expansion for all users
> that
> > > are
> > > > > > using
> > > > > > >> > > > pre-defined grouper in Samza. Note that those users who
> > use
> > > > > custom
> > > > > > >> > > grouper
> > > > > > >> > > > would need to update their implementation.
> > > > > > >> > > >
> > > > > > >> > > > Can you let me know if this, together with the answers
> in
> > > the
> > > > > > >> previous
> > > > > > >> > > > email, addresses all your questions? Thanks for taking
> > time
> > > to
> > > > > > >> review
> > > > > > >> > the
> > > > > > >> > > > proposal.
> > > > > > >> > > >
> > > > > > >> > > > Regards,
> > > > > > >> > > > Dong
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > > > On Wed, May 24, 2017 at 11:15 PM, Dong Lin <
> > > > lindon...@gmail.com
> > > > > >
> > > > > > >> > wrote:
> > > > > > >> > > >
> > > > > > >> > > > > Hey Navina,
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks much for your comments. Please see my reply
> > inline.
> > > > > > >> > > > >
> > > > > > >> > > > > On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh
> > (Apache) <
> > > > > > >> > > > > nav...@apache.org> wrote:
> > > > > > >> > > > >
> > > > > > >> > > > >> Thanks for the SEP, Dong. I have a couple of
> questions
> > to
> > > > > > >> understand
> > > > > > >> > > > your
> > > > > > >> > > > >> proposal better:
> > > > > > >> > > > >>
> > > > > > >> > > > >> * Under motivation, you mention that "_We expect this
> > > > > solution
> > > > > > to
> > > > > > >> > work
> > > > > > >> > > > >> similarly with other input system as well._", yet I
> > don't
> > > > see
> > > > > > any
> > > > > > >> > > > >> discussion on how it will work with other input
> > systems.
> > > > That
> > > > > > is,
> > > > > > >> > what
> > > > > > >> > > > >> kind
> > > > > > >> > > > >> of contract does samza expect from other input
> systems
> > ?
> > > If
> > > > > we
> > > > > > >> are
> > > > > > >> > not
> > > > > > >> > > > >> planning to provide a generic solution, it might be
> > worth
> > > > > > >> calling it
> > > > > > >> > > out
> > > > > > >> > > > >> in
> > > > > > >> > > > >> the SEP.
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > > I think the contract we expect from other systems are
> > > > exactly
> > > > > > the
> > > > > > >> > > > > operational requirement mentioned in the SEP, i.e.
> > > > partitions
> > > > > > >> should
> > > > > > >> > > > always
> > > > > > >> > > > > be doubled and the hash algorithm should module the
> > number
> > > > of
> > > > > > >> > > partitions.
> > > > > > >> > > > > SEP-5 should also allow partition expansion of all
> input
> > > > > systems
> > > > > > >> that
> > > > > > >> > > > meet
> > > > > > >> > > > > these two requirements. I have updated the motivation
> > > > section
> > > > > to
> > > > > > >> > > clarify
> > > > > > >> > > > > this.
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >>
> > > > > > >> > > > >> * I understand the partition mapping logic you have
> > > > proposed.
> > > > > > >> But I
> > > > > > >> > > > think
> > > > > > >> > > > >> the example explanation doesn't match the diagram. In
> > the
> > > > > > >> diagram,
> > > > > > >> > > after
> > > > > > >> > > > >> expansion, partiion-0 and partition-1 are pointing to
> > > > bucket
> > > > > 0
> > > > > > >> and
> > > > > > >> > > > >> partition-3 and partition-4 are pointing to bucket
> 1. I
> > > > think
> > > > > > the
> > > > > > >> > > former
> > > > > > >> > > > >> has to be partition-0 and partition-2 and the latter,
> > is
> > > > > > >> partition-1
> > > > > > >> > > and
> > > > > > >> > > > >> partition-3. If I am wrong, please help me understand
> > the
> > > > > logic
> > > > > > >> :)
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > > Good catch. I will update the figure to fix this
> > problem.
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >>
> > > > > > >> > > > >> * I don't know how partition expansion in Kafka
> works.
> > I
> > > am
> > > > > > >> familiar
> > > > > > >> > > > with
> > > > > > >> > > > >> how shard splitting happens in Kinesis - there is
> > > > > hierarchical
> > > > > > >> > > relation
> > > > > > >> > > > >> between the parent and child shards. This way, it
> will
> > > also
> > > > > > allow
> > > > > > >> > the
> > > > > > >> > > > >> shards to be merged back. Iiuc, Kafka only supports
> > > > partition
> > > > > > >> > > > "expansion",
> > > > > > >> > > > >> as opposed to "splits". Can you provide some context
> or
> > > > link
> > > > > > >> related
> > > > > > >> > > to
> > > > > > >> > > > >> how
> > > > > > >> > > > >> partition expansion works in Kafka?
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > > I couldn't find any wiki on partition expansion in
> > Kafka.
> > > > The
> > > > > > >> > partition
> > > > > > >> > > > > expansion logic in Kafka is very simply -- it simply
> > adds
> > > > new
> > > > > > >> > partition
> > > > > > >> > > > to
> > > > > > >> > > > > the existing topic. Is there specific question you
> have
> > > > > > regarding
> > > > > > >> > > > partition
> > > > > > >> > > > > expansion in Kafka?
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >>
> > > > > > >> > > > >> * Are you only recommending that expansion can be
> > > supported
> > > > > for
> > > > > > >> > samza
> > > > > > >> > > > jobs
> > > > > > >> > > > >> that use Kafka as input systems **and** configure the
> > > > > > SSPGrouper
> > > > > > >> as
> > > > > > >> > > > >> GroupByPartitionFixedTaskNum? Sounds to me like this
> > only
> > > > > > applies
> > > > > > >> > for
> > > > > > >> > > > >> GroupByPartition. Please correct me if I am wrong.
> What
> > > is
> > > > > the
> > > > > > >> > > > expectation
> > > > > > >> > > > >> for custom SSP Groupers?
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > > The expansion can be supported for Samza jobs if the
> > input
> > > > > > system
> > > > > > >> > meets
> > > > > > >> > > > > the operational requirement mentioned above. It
> doesn't
> > > have
> > > > > to
> > > > > > >> use
> > > > > > >> > > Kafka
> > > > > > >> > > > > as input system.
> > > > > > >> > > > >
> > > > > > >> > > > > The current proposal provided solution for jobs that
> > > > currently
> > > > > > use
> > > > > > >> > > > > GroupByPartition. The proposal can be extended to
> > support
> > > > jobs
> > > > > > >> that
> > > > > > >> > use
> > > > > > >> > > > > other grouper that are pre-defined in Samza. The
> custom
> > > SSP
> > > > > > >> grouper
> > > > > > >> > > needs
> > > > > > >> > > > > to handle partition expansion similar to how
> > > > > > >> > > GroupByPartitionFixedTaskNum
> > > > > > >> > > > > handles it and it is users' responsibility to update
> > their
> > > > > > custom
> > > > > > >> > > grouper
> > > > > > >> > > > > implementation.
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >>
> > > > > > >> > > > >> * Regarding storing SSP-to-Task assignment to
> > coordinator
> > > > > > stream:
> > > > > > >> > > Today,
> > > > > > >> > > > >> the JobModel encapsulates the data model in samza
> which
> > > > also
> > > > > > >> > includes
> > > > > > >> > > > >> **TaskModels**. TaskModel, typically shows the
> > > > > task-to-sspList
> > > > > > >> > > mapping.
> > > > > > >> > > > >> What is the reason for using a separate coordinator
> > > stream
> > > > > > >> message
> > > > > > >> > > > >> *SetSSPTaskMapping*? Is it because the JobModel
> itself
> > is
> > > > not
> > > > > > >> > > persisted
> > > > > > >> > > > in
> > > > > > >> > > > >> the coordinator stream today?  The reason locality
> > exists
> > > > > > >> outside of
> > > > > > >> > > the
> > > > > > >> > > > >> jobmodel is because *locality* information is written
> > by
> > > > each
> > > > > > >> > > container,
> > > > > > >> > > > >> where as it is consumed only by the leader
> > > > jobcoordinator/AM.
> > > > > > In
> > > > > > >> > this
> > > > > > >> > > > >> case,
> > > > > > >> > > > >> the writer of the mapping information and the reader
> is
> > > > still
> > > > > > the
> > > > > > >> > > leader
> > > > > > >> > > > >> jobcoordinator/AM. So, I want to understand the
> > > motivation
> > > > > for
> > > > > > >> this
> > > > > > >> > > > >> choice.
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > > Yes, the reason for using a separate coordinate stream
> > > > message
> > > > > > is
> > > > > > >> > > because
> > > > > > >> > > > > the task-to-sspList mapping is not currently persisted
> > in
> > > > the
> > > > > > >> > > coordinator
> > > > > > >> > > > > stream. We wouldn't need to create this new stream
> > message
> > > > if
> > > > > > >> > JobModel
> > > > > > >> > > is
> > > > > > >> > > > > persisted. We need to persist the task-to-sspList
> > mapping
> > > in
> > > > > the
> > > > > > >> > > > > coordinator stream so that the job can derive the
> > original
> > > > > > number
> > > > > > >> of
> > > > > > >> > > > > partitions of each input stream regardless of how many
> > > times
> > > > > the
> > > > > > >> > > > partition
> > > > > > >> > > > > has expanded. Does this make sense?
> > > > > > >> > > > >
> > > > > > >> > > > > I am not sure how this is related to the locality
> > though.
> > > > Can
> > > > > > you
> > > > > > >> > > clarify
> > > > > > >> > > > > your question if I haven't answered your question?
> > > > > > >> > > > >
> > > > > > >> > > > > Thanks!
> > > > > > >> > > > > Dong
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > > >>
> > > > > > >> > > > >> Cheers!
> > > > > > >> > > > >> Navina
> > > > > > >> > > > >>
> > > > > > >> > > > >> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <
> > > > > lindon...@gmail.com
> > > > > > >
> > > > > > >> > > wrote:
> > > > > > >> > > > >>
> > > > > > >> > > > >> > Hi all,
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > We created SEP-5: Enable partition expansion of
> input
> > > > > > streams.
> > > > > > >> > > Please
> > > > > > >> > > > >> find
> > > > > > >> > > > >> > the SEP wiki in the link
> > > > > > >> > > > >> > https://cwiki.apache.org/
> > confluence/display/SAMZA/SEP-
> > > > > > >> > > > >> > 5%3A+Enable+partition+expansion+of+input+streams
> > > > > > >> > > > >> > .
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > You feedback is appreciated!
> > > > > > >> > > > >> >
> > > > > > >> > > > >> > Thanks,
> > > > > > >> > > > >> > Dong
> > > > > > >> > > > >> >
> > > > > > >> > > > >>
> > > > > > >> > > > >
> > > > > > >> > > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to