Hi, Santhoosh,

Thanks for the SEP and the latest revisions. Here are some of my comments
based on the latest proposal:

- The basic idea for implementing state-aware task-to-physical-process
assignment in JobModel is not quite clear. ContainerAllocator is solving a
different problem in host-affinity: trying to put the physical process to
the same physical host/resource, while the JobModel is performing the
grouping of tasks to container processes. In YARN, we need to solve both
affinity problems in code: ContainerAllocator to put the container to the
specific physical host, and the TaskNameGrouper with balance() method to
minimize the task movement, assuming ContainerAllocator is successful. In
standalone, we only need to solve the assignment of tasks to physical host.
So, I think the main mismatch is:

   a) Standalone does not need/rely on ContainerAllocator to associate
container with physical host. It is already done and reported by the
containers in a job

   b) The current TaskNameGrouper with balance() method only considers the
existing task-to-container mapping, which assumes another layer of
container-to-physical-resource is done somewhere else. Hence, it would be
nicer to unify the balance() method to directly consider existing
task-to-physical-resource mapping, which satisfy both YARN and standalone
deployment


Having said that, I think that we will need the following in JobModel
generator:

1) ContainerInfo: (containerId, physicalResourceId)

    * this mapping is the reported mapping from container processes in
standalone, and preferred mapping in YARN

2) TaskLocality: (taskId, physicalResourceId)

    * this mapping is actually reported task location from container
processes in both standalone and YARN

Then, the TaskNameGrouper.group() method is trying the best to group the
tasks to containerIds w/ the same physicalResourceId in a balanced pattern,
in both YARN and standalone.

In YARN, the JobModel will be used by ContainerAllocator to achieve the
preferred container-to-physical-resource mapping, while in standalone, the
JobModel will be directly used by container processes to start processing.


- not quite clear to me that what’s the distributed barrier is used for in
the graph. For every container process to pick up a certain version of
JobModel? Who is waiting on the barrier? The leader or the followers? Or
everyone?


- Additional question: if the input topic partitions change, or the SSP
grouper changes, the task-to-ssp mapping changes and the task locality may
not make sense at all. Is this considered out-of-scope for the current
design? Or we should at least add an version number for task-to-ssp map and
associate it with the task locality info?


- Should the leader validate that everyone has picked up the new version of
JobModel and reported the correct task-locality expected in the JobModel,
after step 10 in the graph?


- Why are we missing a processor-to-locationId mapping in the zknode data
model? Is it a value written to processor.00000N node? Also, why don’t we
write locationId as a value to task0N znode, instead of a child node? And
which znode is the distributed barrier that you used in the graph?


- In State store restoration: “nonexistent local stores will be restored…”
is not clear to me. What do you mean by “nonexistent” here?


- So, based on the description in “Container to physical host assignment”,
you will need to know the full two-level mapping in locality info:
task-to-container-to-physical-host. If that’s the case, how do you keep the
task-to-container mapping in your current znode structure which only has
task0N as children znodes? Please make it clear.


- For Semantics of host affinity with run.id, “all stream processors of a
samza application stops” = one run is not true for YARN deployed Samza
applications. And the description in that section is very confusing. As we
discussed last time, we should define what’s a continuous application run
by definitions on the continuation on the local states, checkpoints, and
intermediate message streams. Within the same run.id, the continuation of
the states/checkpoints/intermediate message streams is guaranteed, while
different run.id would mean a disruptive change to the application and
throwing away all partial states. The actual conditions to determine a
continuation of states in an Samza application in different deployment
environment and different input sources may be different. But the
consistent semantic meaning of “run.id” should be the continuation of
states. With that in mind, the only relevant point is probably that
host-affinity only makes sense within the same run.id.


- what’s the definition of LocationId? An interface? An abstract class?
What’s the specific sub-class for LocationId in RAIN vs in YARN? And
potentially, in other environment like Kubernetes?


- It seems like we can deprecate the whole BalancingTaskNameGrouper
altogether.


- So, it seems that the LocalityManager has no change from the interface
methods. I.e. it still only maintains the container-to-physical-resource
mapping as it does today. That also means that you will somehow store the
task-to-container mapping info in the locality znode as well. It would be
nice to make it clear how the task-to-container-to-physical-resource
mapping is stored and read in ZK.


- Also needs to include compatibility test after deprecating/changing
TaskNameGrouper API, make sure the default behavior of default groupers is
the same.


- In Compatibility section, remove the future versions (i.e. 0.15) since we
are not sure when this change can be completed and released yet. It seems
that we are not changing the persistent data format for locality info in
coordinator stream. Make it explicit.


- If you are making LocalityManager class an interface, are you planning to
make it pluggable as well? Actually, I was thinking that the model we
wanted to go is that making the metadata store for locality info an
interface and pluggable, while keep the LocalityManager as a common
implementation.


- Lastly, from the grouper API definition, you will not be able to get the
physical location info, if it is not passed in via
currentGenerationProcessorIds or ContainerModel. How are you going to
resolve that w/o creating a LocalityManager in Grouper implementation
class? I would strongly recommend no to create an instance of
LocalityManager in the Grouper implementation class.

On Wed, Jan 10, 2018 at 8:00 PM, santhosh venkat <
santhoshvenkat1...@gmail.com> wrote:

> Hi,
>
>
> I created SEP for SAMZA-1554
> <https://issues.apache.org/jira/browse/SAMZA-1554>: Host affinity in
> standalone.
>
>
> The link to the SEP is here:
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309
>
>
> Please review and comments are welcome.
>
>
> Thanks.
>

Reply via email to