Linking Boris' earlier comments to the correct [DISCUSSION] thread: http://mail-archives.apache.org/mod_mbox/samza-dev/201801.mbox/%3CCAPAaT%2BtH2H5TEvFQUn9jw6iR%3DyvVEu46rDLJsqexpwKz0CAH1g%40mail.gmail.com%3E
On Thu, Feb 1, 2018 at 5:27 PM, Yi Pan <nickpa...@gmail.com> wrote: > Hi, Santhoosh, > > Thanks for the SEP and the latest revisions. Here are some of my comments > based on the latest proposal: > > - The basic idea for implementing state-aware task-to-physical-process > assignment in JobModel is not quite clear. ContainerAllocator is solving a > different problem in host-affinity: trying to put the physical process to > the same physical host/resource, while the JobModel is performing the > grouping of tasks to container processes. In YARN, we need to solve both > affinity problems in code: ContainerAllocator to put the container to the > specific physical host, and the TaskNameGrouper with balance() method to > minimize the task movement, assuming ContainerAllocator is successful. In > standalone, we only need to solve the assignment of tasks to physical host. > So, I think the main mismatch is: > > a) Standalone does not need/rely on ContainerAllocator to associate > container with physical host. It is already done and reported by the > containers in a job > > b) The current TaskNameGrouper with balance() method only considers > the existing task-to-container mapping, which assumes another layer of > container-to-physical-resource is done somewhere else. Hence, it would be > nicer to unify the balance() method to directly consider existing > task-to-physical-resource mapping, which satisfy both YARN and standalone > deployment > > > Having said that, I think that we will need the following in JobModel > generator: > > 1) ContainerInfo: (containerId, physicalResourceId) > > * this mapping is the reported mapping from container processes in > standalone, and preferred mapping in YARN > > 2) TaskLocality: (taskId, physicalResourceId) > > * this mapping is actually reported task location from container > processes in both standalone and YARN > > Then, the TaskNameGrouper.group() method is trying the best to group the > tasks to containerIds w/ the same physicalResourceId in a balanced pattern, > in both YARN and standalone. > > In YARN, the JobModel will be used by ContainerAllocator to achieve the > preferred container-to-physical-resource mapping, while in standalone, the > JobModel will be directly used by container processes to start processing. > > > - not quite clear to me that what’s the distributed barrier is used for in > the graph. For every container process to pick up a certain version of > JobModel? Who is waiting on the barrier? The leader or the followers? Or > everyone? > > > - Additional question: if the input topic partitions change, or the SSP > grouper changes, the task-to-ssp mapping changes and the task locality may > not make sense at all. Is this considered out-of-scope for the current > design? Or we should at least add an version number for task-to-ssp map and > associate it with the task locality info? > > > - Should the leader validate that everyone has picked up the new version > of JobModel and reported the correct task-locality expected in the > JobModel, after step 10 in the graph? > > > - Why are we missing a processor-to-locationId mapping in the zknode data > model? Is it a value written to processor.00000N node? Also, why don’t we > write locationId as a value to task0N znode, instead of a child node? And > which znode is the distributed barrier that you used in the graph? > > > - In State store restoration: “nonexistent local stores will be restored…” > is not clear to me. What do you mean by “nonexistent” here? > > > - So, based on the description in “Container to physical host assignment”, > you will need to know the full two-level mapping in locality info: > task-to-container-to-physical-host. If that’s the case, how do you keep > the task-to-container mapping in your current znode structure which only > has task0N as children znodes? Please make it clear. > > > - For Semantics of host affinity with run.id, “all stream processors of a > samza application stops” = one run is not true for YARN deployed Samza > applications. And the description in that section is very confusing. As we > discussed last time, we should define what’s a continuous application run > by definitions on the continuation on the local states, checkpoints, and > intermediate message streams. Within the same run.id, the continuation of > the states/checkpoints/intermediate message streams is guaranteed, while > different run.id would mean a disruptive change to the application and > throwing away all partial states. The actual conditions to determine a > continuation of states in an Samza application in different deployment > environment and different input sources may be different. But the > consistent semantic meaning of “run.id” should be the continuation of > states. With that in mind, the only relevant point is probably that > host-affinity only makes sense within the same run.id. > > > - what’s the definition of LocationId? An interface? An abstract class? > What’s the specific sub-class for LocationId in RAIN vs in YARN? And > potentially, in other environment like Kubernetes? > > > - It seems like we can deprecate the whole BalancingTaskNameGrouper > altogether. > > > - So, it seems that the LocalityManager has no change from the interface > methods. I.e. it still only maintains the container-to-physical-resource > mapping as it does today. That also means that you will somehow store the > task-to-container mapping info in the locality znode as well. It would be > nice to make it clear how the task-to-container-to-physical-resource > mapping is stored and read in ZK. > > > - Also needs to include compatibility test after deprecating/changing > TaskNameGrouper API, make sure the default behavior of default groupers is > the same. > > > - In Compatibility section, remove the future versions (i.e. 0.15) since > we are not sure when this change can be completed and released yet. It > seems that we are not changing the persistent data format for locality info > in coordinator stream. Make it explicit. > > > - If you are making LocalityManager class an interface, are you planning > to make it pluggable as well? Actually, I was thinking that the model we > wanted to go is that making the metadata store for locality info an > interface and pluggable, while keep the LocalityManager as a common > implementation. > > > - Lastly, from the grouper API definition, you will not be able to get the > physical location info, if it is not passed in via > currentGenerationProcessorIds or ContainerModel. How are you going to > resolve that w/o creating a LocalityManager in Grouper implementation > class? I would strongly recommend no to create an instance of > LocalityManager in the Grouper implementation class. > > On Wed, Jan 10, 2018 at 8:00 PM, santhosh venkat < > santhoshvenkat1...@gmail.com> wrote: > >> Hi, >> >> >> I created SEP for SAMZA-1554 >> <https://issues.apache.org/jira/browse/SAMZA-1554>: Host affinity in >> standalone. >> >> >> The link to the SEP is here: >> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75957309 >> >> >> Please review and comments are welcome. >> >> >> Thanks. >> > >