> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> >

Thanks for catching these. Patch coming as soon as the unit tests are done.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java,
> >  line 37
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312392#file1312392line37>
> >
> >     nit: the key in the example shows containerId = 1 and the values show 
> > containerId = 139. They should be the same, right?

The 1 in the key is the message version, so no conflict there.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
> >  line 245
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line245>
> >
> >     It seems like the logic here is assuming:
> >     1) we always try to "fill up" each container from 0-n based on average 
> > count
> >     2) in successive runs, there can't be any task-container mapping 
> > sequence like:
> >     3 tasks -> container 0
> >     3 tasks -> container 1
> >     1 task  -> container 2
> >     3 tasks -> container 3
> >     In the above case, when a new container 4 is added, there will not be 
> > balanced assignment since container 4 will get 3 tasks.
> >     
> >     I would suggest that we don't assume what would be the result from the 
> > previous balance and the calculation is always based on the whole set of 
> > tasks <-> containers. A simple logic would be:
> >     - initialize the set of re-assignable tasks (i.e. from containers that 
> > are removed, from containers that are over the expected average)
> >     - sort all containers based on the current assigned number of tasks, 
> > minus the re-assignable tasks
> >     - start assigning the re-assigning tasks from the end of the sorted 
> > container list, till it reaches the expected average.
> >     - stop the whole algorithm till the re-assignable tasks set is empty.

Great attention to detail!
I didn't see that corner case.
I'll add a unit test for this and fix it.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java,
> >  line 62
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312389#file1312389line62>
> >
> >     Do we have a use case for write-only SamzaTaskAssignmentManager? We 
> > needed it for LocalityManager because SamzaContainer includes an instance 
> > of LocalityManager which is write-only. If we don't have the write-only use 
> > case, we should just remove the write-only flag.

Yeah, classic case of over-engineering. 
https://en.wikipedia.org/wiki/You_aren%27t_gonna_need_it
I'll remove it. :-)


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
> >  line 99
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line99>
> >
> >     Question: why do we care that the minimum movement only applies to the 
> > case where container number changes? Even if the container numbers are the 
> > same, won't it be benefitial that we just honor the previous task 
> > assignment for stateful jobs, instead of re-computing?

I'm glad you said that. That's actually what this code does. It reuses the 
existing map if the container count doesn't change. It also enables custom 
mappings, wherein some tool writes a mapping to the coordinator stream for the 
current expected container count.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
> >  line 227
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line227>
> >
> >     What if taskNamesToAssign.isEmpty()==true here?

It can't if the math is right in calculateTaskCountPerContainer()

I'll add an assertion to make sure.


> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java,
> >  line 231
> > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line231>
> >
> >     What if at the end, taskNamesToAssign is not empty?

Same answer as above. I'll make sure with an assertion.


- Jake


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/45144/#review125157
-----------------------------------------------------------


On March 24, 2016, 9:33 p.m., Jake Maes wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/45144/
> -----------------------------------------------------------
> 
> (Updated March 24, 2016, 9:33 p.m.)
> 
> 
> Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan 
> (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Persist the task-to-container mapping in the coordinator stream and use it to 
> minimize the reassignment when the container count changes.
> 
> A new BalancingTaskNameGrouper interface exposes the balance() method, which 
> can be implemented pluggably. 
> GroupByContainerCount has been rewritten in java and the balance() 
> functionality added. This is because the balance logic is specific to a 
> grouper.
> 
> Detailed changes:
> import-control.xml - Update imports that weren't needed in scala and add some 
> for tests.
> LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the 
> JobCoordinator code cleaner, but also associates the 2 managers for Host 
> Affinity info.
> GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements 
> BalancingTaskNameGrouper
> GroupByContainerCountFactory - Rewritten in Java
> TestStorageRecovery - Old test depended on the order of the partitions in a 
> container. Now it doesn't.
> TestGroupByContainerCount - Rewritten in Java and lots of tests added for 
> balance()
> BalancingTaskNameGrouper - New interface for the balance method. Exposes the 
> new functionality without breaking backward compatibility
> TaskAssignmentManager - Coordinator stream manager for task-to-container 
> mapping
> TaskNameGroupBalancer - Bridges the task mapping (balance) capability with 
> taskname groupers, old and new
> SetTaskContainerMapping - Coordinator strem message for the task-to-container 
> mapping
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c 
>   samza-core/src/main/java/org/apache/samza/container/LocalityManager.java 
> acf93525ea5c97df187bbe7977e2ae9fea65b32b 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java
>  211b64215f26db49cd4411ff3fb41231145307d7 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java
>  4d093b500b7f3b582446634ced3e9d0b28371616 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java
>  PRE-CREATION 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala
>  cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 
>   
> samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala
>  8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 06a96ad6ed786c22924017f894413bfa1ea34c06 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java
>  PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java 
> PRE-CREATION 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java
>  e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 
>   
> samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java
>  429573b480112c7491303dc410d78f37a308c4a7 
>   samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java 
> 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc 
>   
> samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala
>  6e9c6fa579a5901000bea0601c771783d8334f0e 
> 
> Diff: https://reviews.apache.org/r/45144/diff/
> 
> 
> Testing
> -------
> 
> A bunch of new unit tests have been added. 
> 
> Also tested with a test job. The task mapping (initially missing) is added 
> the first time the job is run. It is then used as expected to reduce task 
> reassignment as the container count was adjusted from 4->3->5 on subsequent 
> runs.
> 
> 
> Thanks,
> 
> Jake Maes
> 
>

Reply via email to