Hey Navina,

Thanks much for your comments. Please see my reply inline.

On Wed, May 24, 2017 at 10:22 AM, Navina Ramesh (Apache) <nav...@apache.org>
wrote:

> Thanks for the SEP, Dong. I have a couple of questions to understand your
> proposal better:
>
> * Under motivation, you mention that "_We expect this solution to work
> similarly with other input system as well._", yet I don't see any
> discussion on how it will work with other input systems. That is, what kind
> of contract does samza expect from other input systems ? If we are not
> planning to provide a generic solution, it might be worth calling it out in
> the SEP.
>

I think the contract we expect from other systems are exactly the
operational requirement mentioned in the SEP, i.e. partitions should always
be doubled and the hash algorithm should module the number of partitions.
SEP-5 should also allow partition expansion of all input systems that meet
these two requirements. I have updated the motivation section to clarify
this.


>
> * I understand the partition mapping logic you have proposed. But I think
> the example explanation doesn't match the diagram. In the diagram, after
> expansion, partiion-0 and partition-1 are pointing to bucket 0 and
> partition-3 and partition-4 are pointing to bucket 1. I think the former
> has to be partition-0 and partition-2 and the latter, is partition-1 and
> partition-3. If I am wrong, please help me understand the logic :)
>

Good catch. I will update the figure to fix this problem.


>
> * I don't know how partition expansion in Kafka works. I am familiar with
> how shard splitting happens in Kinesis - there is hierarchical relation
> between the parent and child shards. This way, it will also allow the
> shards to be merged back. Iiuc, Kafka only supports partition "expansion",
> as opposed to "splits". Can you provide some context or link related to how
> partition expansion works in Kafka?
>

I couldn't find any wiki on partition expansion in Kafka. The partition
expansion logic in Kafka is very simply -- it simply adds new partition to
the existing topic. Is there specific question you have regarding partition
expansion in Kafka?


>
> * Are you only recommending that expansion can be supported for samza jobs
> that use Kafka as input systems **and** configure the SSPGrouper as
> GroupByPartitionFixedTaskNum? Sounds to me like this only applies for
> GroupByPartition. Please correct me if I am wrong. What is the expectation
> for custom SSP Groupers?
>

The expansion can be supported for Samza jobs if the input system meets the
operational requirement mentioned above. It doesn't have to use Kafka as
input system.

The current proposal provided solution for jobs that currently use
GroupByPartition. The proposal can be extended to support jobs that use
other grouper that are pre-defined in Samza. The custom SSP grouper needs
to handle partition expansion similar to how GroupByPartitionFixedTaskNum
handles it and it is users' responsibility to update their custom grouper
implementation.


>
> * Regarding storing SSP-to-Task assignment to coordinator stream: Today,
> the JobModel encapsulates the data model in samza which also includes
> **TaskModels**. TaskModel, typically shows the task-to-sspList mapping.
> What is the reason for using a separate coordinator stream message
> *SetSSPTaskMapping*? Is it because the JobModel itself is not persisted in
> the coordinator stream today?  The reason locality exists outside of the
> jobmodel is because *locality* information is written by each container,
> where as it is consumed only by the leader jobcoordinator/AM. In this case,
> the writer of the mapping information and the reader is still the leader
> jobcoordinator/AM. So, I want to understand the motivation for this choice.
>

Yes, the reason for using a separate coordinate stream message is because
the task-to-sspList mapping is not currently persisted in the coordinator
stream. We wouldn't need to create this new stream message if JobModel is
persisted. We need to persist the task-to-sspList mapping in the
coordinator stream so that the job can derive the original number of
partitions of each input stream regardless of how many times the partition
has expanded. Does this make sense?

I am not sure how this is related to the locality though. Can you clarify
your question if I haven't answered your question?

Thanks!
Dong


>
> Cheers!
> Navina
>
> On Tue, May 23, 2017 at 5:45 PM, Dong Lin <lindon...@gmail.com> wrote:
>
> > Hi all,
> >
> > We created SEP-5: Enable partition expansion of input streams. Please
> find
> > the SEP wiki in the link
> > https://cwiki.apache.org/confluence/display/SAMZA/SEP-
> > 5%3A+Enable+partition+expansion+of+input+streams
> > .
> >
> > You feedback is appreciated!
> >
> > Thanks,
> > Dong
> >
>

Reply via email to