Hi, Santhoosh, Thanks for the SEP and the latest revisions. Here are some of my comments based on the latest proposal:
- The basic idea for implementing state-aware task-to-physical-process assignment in JobModel is not quite clear. ContainerAllocator is solving a different problem in host-affinity: trying to put the physical process to the same physical host/resource, while the JobModel is performing the grouping of tasks to container processes. In YARN, we need to solve both affinity problems in code: ContainerAllocator to put the container to the specific physical host, and the TaskNameGrouper with balance() method to minimize the task movement, assuming ContainerAllocator is successful. In standalone, we only need to solve the assignment of tasks to physical host. So, I think the main mismatch is: a) Standalone does not need/rely on ContainerAllocator to associate container with physical host. It is already done and reported by the containers in a job b) The current TaskNameGrouper with balance() method only considers the existing task-to-container mapping, which assumes another layer of container-to-physical-resource is done somewhere else. Hence, it would be nicer to unify the balance() method to directly consider existing task-to-physical-resource mapping, which satisfy both YARN and standalone deployment Having said that, I think that we will need the following in JobModel generator: 1) ContainerInfo: (containerId, physicalResourceId) * this mapping is the reported mapping from container processes in standalone, and preferred mapping in YARN 2) TaskLocality: (taskId, physicalResourceId) * this mapping is actually reported task location from container processes in both standalone and YARN Then, the TaskNameGrouper.group() method is trying the best to group the tasks to containerIds w/ the same physicalResourceId in a balanced pattern, in both YARN and standalone. In YARN, the JobModel will be used by ContainerAllocator to achieve the preferred container-to-physical-resource mapping, while in standalone, the JobModel will be directly used by container processes to start processing. - not quite clear to me that what’s the distributed barrier is used for in the graph. For every container process to pick up a certain version of JobModel? Who is waiting on the barrier? The leader or the followers? Or everyone? - Additional question: if the input topic partitions change, or the SSP grouper changes, the task-to-ssp mapping changes and the task locality may not make sense at all. Is this considered out-of-scope for the current design? Or we should at least add an version number for task-to-ssp map and associate it with the task locality info? - Should the leader validate that everyone has picked up the new version of JobModel and reported the correct task-locality expected in the JobModel, after step 10 in the graph? - Why are we missing a processor-to-locationId mapping in the zknode data model? Is it a value written to processor.00000N node? Also, why don’t we write locationId as a value to task0N znode, instead of a child node? And which znode is the distributed barrier that you used in the graph? - In State store restoration: “nonexistent local stores will be restored…” is not clear to me. What do you mean by “nonexistent” here? - So, based on the description in “Container to physical host assignment”, you will need to know the full two-level mapping in locality info: task-to-container-to-physical-host. If that’s the case, how do you keep the task-to-container mapping in your current znode structure which only has task0N as children znodes? Please make it clear. - For Semantics of host affinity with run.id, “all stream processors of a samza application stops” = one run is not true for YARN deployed Samza applications. And the description in that section is very confusing. As we discussed last time, we should define what’s a continuous application run by definitions on the continuation on the local states, checkpoints, and intermediate message streams. Within the same run.id, the continuation of the states/checkpoints/intermediate message streams is guaranteed, while different run.id would mean a disruptive change to the application and throwing away all partial states. The actual conditions to determine a continuation of states in an Samza application in different deployment environment and different input sources may be different. But the consistent semantic meaning of “run.id” should be the continuation of states. With that in mind, the only relevant point is probably that host-affinity only makes sense within the same run.id. - what’s the definition of LocationId? An interface? An abstract class? What’s the specific sub-class for LocationId in RAIN vs in YARN? And potentially, in other environment like Kubernetes? - It seems like we can deprecate the whole BalancingTaskNameGrouper altogether. - So, it seems that the LocalityManager has no change from the interface methods. I.e. it still only maintains the container-to-physical-resource mapping as it does today. That also means that you will somehow store the task-to-container mapping info in the locality znode as well. It would be nice to make it clear how the task-to-container-to-physical-resource mapping is stored and read in ZK. - Also needs to include compatibility test after deprecating/changing TaskNameGrouper API, make sure the default behavior of default groupers is the same. - In Compatibility section, remove the future versions (i.e. 0.15) since we are not sure when this change can be completed and released yet. It seems that we are not changing the persistent data format for locality info in coordinator stream. Make it explicit. - If you are making LocalityManager class an interface, are you planning to make it pluggable as well? Actually, I was thinking that the model we wanted to go is that making the metadata store for locality info an interface and pluggable, while keep the LocalityManager as a common implementation. - Lastly, from the grouper API definition, you will not be able to get the physical location info, if it is not passed in via currentGenerationProcessorIds or ContainerModel. How are you going to resolve that w/o creating a LocalityManager in Grouper implementation class? I would strongly recommend no to create an instance of LocalityManager in the Grouper implementation class. On Wed, Jan 10, 2018 at 8:00 PM, santhosh venkat < santhoshvenkat1...@gmail.com> wrote: > Hi, > > > I created SEP for SAMZA-1554 > <https://issues.apache.org/jira/browse/SAMZA-1554>: Host affinity in > standalone. > > > The link to the SEP is here: > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309 > > > Please review and comments are welcome. > > > Thanks. >