> On March 25, 2016, 10:26 p.m., Navina Ramesh wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
> > line 41
> > <https://reviews.apache.org/r/45144/diff/7/?file=1314982#file1314982line41>
> >
> >     Why should the TaskAssignmentManager be a part of the LocalityManager? 
> > It doesn't seem to be doing much other than providing an accessor 
> > getTaskAssignmentManager to GroupByContainerCount. An extension of 
> > AbstractCoordinaotStreamManager typically performs only one function. This 
> > is deviation that I think is not necessary. 
> >     
> >     The TaskAssignmentManager could be instantiated in JobCoordinator and 
> > passed to the "balance" call. Do we really need to couple them together?
> 
> Jake Maes wrote:
>     Explained above in response to Jagadish's review:
>     "
>     1. At one end of the spectrum, they could be completely independent, but 
> that would complicate a number of method signatures in the JobCoordinator
>     2. At the other end of the spectrum, there could be one LocalityManager 
> that is composed of a TaskLocalityManager and a ContainerLocalityManager, 
> each of which handle the coordinator stream interactions. This looks best 
> structurally, but since the current LocalityManager is used in SamzaContainer 
> and the TaskAssignmentManager will not, it's questionable whether this 
> structure fits the usage pattern.
>     3. I chose the middle, where the TaskAssignmentManager is a field of the 
> LocalityManger, which loosely associates them.
>     
>     The latter 2 options both simplify the JobCoordinator, and after the 
> upcoming diff, allow us to pass one manager into the balance() method which 
> contains both the task and container mappings. This will be useful for more 
> intelligent implementations of the balance() method, which might try to 
> reassign tasks from containers that were previously running on the same host 
> to a new container also on that host. 
>     "

Oops.. I didn't see your previous responses. My comments was mostly questioning 
whether such an explicit association is required. 

On point 1, I don't think it is a major change in the JobCoordinator 
interfaces. They are all mostly private methods and are going to be refactored 
as a part of Jagadish's work. 

Unlike TaskAssignmentManager, LocalityManager is meant to be accessible by both 
Samza container and job coordinator. This is not the case with 
TaskAssignmentManager as it is fully controlled (read & write) only by the Job 
Coordinator. So, I don't see this affecting the SamzaContainer in anyway. If at 
all, I feel that the SamzaContainer now has a loop-hole to access the 
TaskAssignmentManager :)

In my view, "balance" seems like a specific implementation of TaskNameGrouper 
that could have been called BalancedGroupByContainerCount. Granted, this will 
cause inconvenience to existing users of GroupByContainerCount to change their 
config.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review125481
-----------------------------------------------------------


On March 25, 2016, 5:30 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 25, 2016, 5:30 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan 
> (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to 
> minimize the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which 
> can be implemented pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() 
> functionality added. This is because the balance logic is specific to a 
> grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some 
> for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the 
> JobCoordinator code cleaner, but also associates the 2 managers for Host 
> Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements 
> BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a 
> container. Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for 
> balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the 
> new functionality without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container 
> mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with 
> taskname groupers, old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container 
> mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> acf93525ea5c97df187bbe7977e2ae9fea65b32b 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  211b64215f26db49cd4411ff3fb41231145307d7 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  4d093b500b7f3b582446634ced3e9d0b28371616 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
>  cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
>  8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 06a96ad6ed786c22924017f894413bfa1ea34c06 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added 
> the first time the job is run. It is then used as expected to reduce task 
> reassignment as the container count was adjusted from 4->3->5 on subsequent 
> runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to