> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
> > line 40
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309649#file1309649line40>
> >
> >     Please make this final (though this is unrelated to your change.) 
> > SAMZA-902 tracks refactoring to make Samza classes more thread-safe.

I was going to do this, but it is assigned on line 108.


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java, 
> > line 142
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309649#file1309649line142>
> >
> >     Why do we need a setter here? I recommend moving this setter to the 
> > constructor of LocalityManager. 
> >     
> >     The following properties nicely fall out :
> >     1. We could make taskAssignmentManager final as a result.
> >     2. It becomes easy to reason about this class as the object created is 
> > consistent right away. As opposed to waiting for the creator to call a 
> > setter() before you can call get() on the taskAssignmentManager.
> >     3. It makes my life for SAMZA-902 easier ;)

Because I wasn't sure how tightly I wanted to associate the 
TaskAssignmentManager and the LocalityManager. :-)

- At one end of the spectrum, they could be completely independent, but that 
would complicate a number of method signatures in the JobCoordinator
 
- At the other end of the spectrum, there could be one LocalityManager that is 
composed of a TaskLocalityManager and a ContainerLocalityManager, each of which 
handle the coordinator stream interactions. This looks best structurally, but 
since the current LocalityManager is used in SamzaContainer and the 
TaskAssignmentManager will not, it's questionable whether this structure fits 
the usage pattern.

- I chose the middle, where the TaskAssignmentManager is a field of the 
LocalityManger, which loosely associates them. 
 
The latter 2 options both simplify the JobCoordinator, and after the upcoming 
diff, allow us to pass one manager into the balance() method which contains 
both the task and container mappings. This will be useful for more intelligent 
implementations of the balance() method, which might try to reassign tasks from 
containers that were previously running on the same host to a new container 
also on that host. 

Anyway, you're right, if I'm going to use approach 3, I should commit to it and 
make the field final! Thanks for the suggestion.


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
> >  line 89
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309651#file1309651line89>
> >
> >     allContainers.size is used in 4 places. Maybe, use a local variable 
> > called - prevContainerCount? It also makes it easy to read that 
> > allContainers.size() is actually referring to container count prior to the 
> > balance.

Five places, actually. Thanks for noticing this. I'll do exactly that.


> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
> >  line 180
> > <https://reviews.apache.org/r/45144/diff/1/?file=1309651#file1309651line180>
> >
> >     This logic is well written :-)
> >     
> >     It will really help if you can illustrate with a small example as a 
> > comment.
> >     1. when counts increase.
> >     2. when counts decrease.
> 
> Jagadish Venkatraman wrote:
>     Here's an idea, save it for later if you think it's a total diversion. 
> But, I believe it's worth investigating.
>     
>     How about making the TaskAssignmentManager immutable? That way, the 
> TaskAssignment manager is given the list of assignments when it starts and 
> the assignments does not change.
>     
>     The following are the advantages:
>     1. Currently, the map that tracks <taskNameToContainerId> can be made 
> final.
>     2. The mapping can be initialized once during construction. The 
> constructor could take care of deleting the old mappings, and re-refreshing 
> the new mappings into the coordinator stream.
>     3. Do we change the map after the JobCoordinator starts/stops containers? 
> If not, then it makes sense for it to be immutable.
>     4. We can fail early during construction time.
>     5. Methods that return the taskNameToContainerId can now return a cached 
> copy as opposed to re-bootstrapping from coordinator stream each time.
> 
> Jagadish Venkatraman wrote:
>     I just realized that it's hard to implement this. I believe that the 
> TaskAssignmentManager has been modelled according to the LocalityManager that 
> obtains the container locality. Never mind.

There are some comments to that effect in the unit tests. I could copy them 
over, but they sort of dominate the file.

About immutability; the one way to do it is to remove the map all together. It 
really only enables the log messages, which I could put in the grouper.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review124812
-----------------------------------------------------------


On March 22, 2016, 1:20 a.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 22, 2016, 1:20 a.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan 
> (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to 
> minimize the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which 
> can be implemented pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() 
> functionality added. This is because the balance logic is specific to a 
> grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some 
> for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the 
> JobCoordinator code cleaner, but also associates the 2 managers for Host 
> Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements 
> BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a 
> container. Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for 
> balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the 
> new functionality without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container 
> mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with 
> taskname groupers, old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container 
> mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> acf93525ea5c97df187bbe7977e2ae9fea65b32b 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGroupBalancer.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  211b64215f26db49cd4411ff3fb41231145307d7 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  4d093b500b7f3b582446634ced3e9d0b28371616 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
>  cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
>  8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 06a96ad6ed786c22924017f894413bfa1ea34c06 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGroupBalancer.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added 
> the first time the job is run. It is then used as expected to reduce task 
> reassignment as the container count was adjusted from 4->3->5 on subsequent 
> runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to