Linking Boris' earlier comments to the correct [DISCUSSION] thread:
http://mail-archives.apache.org/mod_mbox/samza-dev/201801.mbox/%3CCAPAaT%2BtH2H5TEvFQUn9jw6iR%3DyvVEu46rDLJsqexpwKz0CAH1g%40mail.gmail.com%3E

On Thu, Feb 1, 2018 at 5:27 PM, Yi Pan <nickpa...@gmail.com> wrote:

> Hi, Santhoosh,
>
> Thanks for the SEP and the latest revisions. Here are some of my comments
> based on the latest proposal:
>
> - The basic idea for implementing state-aware task-to-physical-process
> assignment in JobModel is not quite clear. ContainerAllocator is solving a
> different problem in host-affinity: trying to put the physical process to
> the same physical host/resource, while the JobModel is performing the
> grouping of tasks to container processes. In YARN, we need to solve both
> affinity problems in code: ContainerAllocator to put the container to the
> specific physical host, and the TaskNameGrouper with balance() method to
> minimize the task movement, assuming ContainerAllocator is successful. In
> standalone, we only need to solve the assignment of tasks to physical host.
> So, I think the main mismatch is:
>
>    a) Standalone does not need/rely on ContainerAllocator to associate
> container with physical host. It is already done and reported by the
> containers in a job
>
>    b) The current TaskNameGrouper with balance() method only considers
> the existing task-to-container mapping, which assumes another layer of
> container-to-physical-resource is done somewhere else. Hence, it would be
> nicer to unify the balance() method to directly consider existing
> task-to-physical-resource mapping, which satisfy both YARN and standalone
> deployment
>
>
> Having said that, I think that we will need the following in JobModel
> generator:
>
> 1) ContainerInfo: (containerId, physicalResourceId)
>
>     * this mapping is the reported mapping from container processes in
> standalone, and preferred mapping in YARN
>
> 2) TaskLocality: (taskId, physicalResourceId)
>
>     * this mapping is actually reported task location from container
> processes in both standalone and YARN
>
> Then, the TaskNameGrouper.group() method is trying the best to group the
> tasks to containerIds w/ the same physicalResourceId in a balanced pattern,
> in both YARN and standalone.
>
> In YARN, the JobModel will be used by ContainerAllocator to achieve the
> preferred container-to-physical-resource mapping, while in standalone, the
> JobModel will be directly used by container processes to start processing.
>
>
> - not quite clear to me that what’s the distributed barrier is used for in
> the graph. For every container process to pick up a certain version of
> JobModel? Who is waiting on the barrier? The leader or the followers? Or
> everyone?
>
>
> - Additional question: if the input topic partitions change, or the SSP
> grouper changes, the task-to-ssp mapping changes and the task locality may
> not make sense at all. Is this considered out-of-scope for the current
> design? Or we should at least add an version number for task-to-ssp map and
> associate it with the task locality info?
>
>
> - Should the leader validate that everyone has picked up the new version
> of JobModel and reported the correct task-locality expected in the
> JobModel, after step 10 in the graph?
>
>
> - Why are we missing a processor-to-locationId mapping in the zknode data
> model? Is it a value written to processor.00000N node? Also, why don’t we
> write locationId as a value to task0N znode, instead of a child node? And
> which znode is the distributed barrier that you used in the graph?
>
>
> - In State store restoration: “nonexistent local stores will be restored…”
> is not clear to me. What do you mean by “nonexistent” here?
>
>
> - So, based on the description in “Container to physical host assignment”,
> you will need to know the full two-level mapping in locality info:
> task-to-container-to-physical-host. If that’s the case, how do you keep
> the task-to-container mapping in your current znode structure which only
> has task0N as children znodes? Please make it clear.
>
>
> - For Semantics of host affinity with run.id, “all stream processors of a
> samza application stops” = one run is not true for YARN deployed Samza
> applications. And the description in that section is very confusing. As we
> discussed last time, we should define what’s a continuous application run
> by definitions on the continuation on the local states, checkpoints, and
> intermediate message streams. Within the same run.id, the continuation of
> the states/checkpoints/intermediate message streams is guaranteed, while
> different run.id would mean a disruptive change to the application and
> throwing away all partial states. The actual conditions to determine a
> continuation of states in an Samza application in different deployment
> environment and different input sources may be different. But the
> consistent semantic meaning of “run.id” should be the continuation of
> states. With that in mind, the only relevant point is probably that
> host-affinity only makes sense within the same run.id.
>
>
> - what’s the definition of LocationId? An interface? An abstract class?
> What’s the specific sub-class for LocationId in RAIN vs in YARN? And
> potentially, in other environment like Kubernetes?
>
>
> - It seems like we can deprecate the whole BalancingTaskNameGrouper
> altogether.
>
>
> - So, it seems that the LocalityManager has no change from the interface
> methods. I.e. it still only maintains the container-to-physical-resource
> mapping as it does today. That also means that you will somehow store the
> task-to-container mapping info in the locality znode as well. It would be
> nice to make it clear how the task-to-container-to-physical-resource
> mapping is stored and read in ZK.
>
>
> - Also needs to include compatibility test after deprecating/changing
> TaskNameGrouper API, make sure the default behavior of default groupers is
> the same.
>
>
> - In Compatibility section, remove the future versions (i.e. 0.15) since
> we are not sure when this change can be completed and released yet. It
> seems that we are not changing the persistent data format for locality info
> in coordinator stream. Make it explicit.
>
>
> - If you are making LocalityManager class an interface, are you planning
> to make it pluggable as well? Actually, I was thinking that the model we
> wanted to go is that making the metadata store for locality info an
> interface and pluggable, while keep the LocalityManager as a common
> implementation.
>
>
> - Lastly, from the grouper API definition, you will not be able to get the
> physical location info, if it is not passed in via
> currentGenerationProcessorIds or ContainerModel. How are you going to
> resolve that w/o creating a LocalityManager in Grouper implementation
> class? I would strongly recommend no to create an instance of
> LocalityManager in the Grouper implementation class.
>
> On Wed, Jan 10, 2018 at 8:00 PM, santhosh venkat <
> santhoshvenkat1...@gmail.com> wrote:
>
>> Hi,
>>
>>
>> I created SEP for SAMZA-1554
>> <https://issues.apache.org/jira/browse/SAMZA-1554>: Host affinity in
>> standalone.
>>
>>
>> The link to the SEP is here:
>>
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309
>>
>>
>> Please review and comments are welcome.
>>
>>
>> Thanks.
>>
>
>

Reply via email to