> On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > >
Thanks for catching these. Patch coming as soon as the unit tests are done. > On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java, > > line 37 > > <https://reviews.apache.org/r/45144/diff/2/?file=1312392#file1312392line37> > > > > nit: the key in the example shows containerId = 1 and the values show > > containerId = 139. They should be the same, right? The 1 in the key is the message version, so no conflict there. > On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java, > > line 245 > > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line245> > > > > It seems like the logic here is assuming: > > 1) we always try to "fill up" each container from 0-n based on average > > count > > 2) in successive runs, there can't be any task-container mapping > > sequence like: > > 3 tasks -> container 0 > > 3 tasks -> container 1 > > 1 task -> container 2 > > 3 tasks -> container 3 > > In the above case, when a new container 4 is added, there will not be > > balanced assignment since container 4 will get 3 tasks. > > > > I would suggest that we don't assume what would be the result from the > > previous balance and the calculation is always based on the whole set of > > tasks <-> containers. A simple logic would be: > > - initialize the set of re-assignable tasks (i.e. from containers that > > are removed, from containers that are over the expected average) > > - sort all containers based on the current assigned number of tasks, > > minus the re-assignable tasks > > - start assigning the re-assigning tasks from the end of the sorted > > container list, till it reaches the expected average. > > - stop the whole algorithm till the re-assignable tasks set is empty. Great attention to detail! I didn't see that corner case. I'll add a unit test for this and fix it. > On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java, > > line 62 > > <https://reviews.apache.org/r/45144/diff/2/?file=1312389#file1312389line62> > > > > Do we have a use case for write-only SamzaTaskAssignmentManager? We > > needed it for LocalityManager because SamzaContainer includes an instance > > of LocalityManager which is write-only. If we don't have the write-only use > > case, we should just remove the write-only flag. Yeah, classic case of over-engineering. https://en.wikipedia.org/wiki/You_aren%27t_gonna_need_it I'll remove it. :-) > On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java, > > line 99 > > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line99> > > > > Question: why do we care that the minimum movement only applies to the > > case where container number changes? Even if the container numbers are the > > same, won't it be benefitial that we just honor the previous task > > assignment for stateful jobs, instead of re-computing? I'm glad you said that. That's actually what this code does. It reuses the existing map if the container count doesn't change. It also enables custom mappings, wherein some tool writes a mapping to the coordinator stream for the current expected container count. > On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java, > > line 227 > > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line227> > > > > What if taskNamesToAssign.isEmpty()==true here? It can't if the math is right in calculateTaskCountPerContainer() I'll add an assertion to make sure. > On March 24, 2016, 11:13 p.m., Yi Pan (Data Infrastructure) wrote: > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java, > > line 231 > > <https://reviews.apache.org/r/45144/diff/2/?file=1312388#file1312388line231> > > > > What if at the end, taskNamesToAssign is not empty? Same answer as above. I'll make sure with an assertion. - Jake ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/45144/#review125157 ----------------------------------------------------------- On March 24, 2016, 9:33 p.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/45144/ > ----------------------------------------------------------- > > (Updated March 24, 2016, 9:33 p.m.) > > > Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan > (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Persist the task-to-container mapping in the coordinator stream and use it to > minimize the reassignment when the container count changes. > > A new BalancingTaskNameGrouper interface exposes the balance() method, which > can be implemented pluggably. > GroupByContainerCount has been rewritten in java and the balance() > functionality added. This is because the balance logic is specific to a > grouper. > > Detailed changes: > import-control.xml - Update imports that weren't needed in scala and add some > for tests. > LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the > JobCoordinator code cleaner, but also associates the 2 managers for Host > Affinity info. > GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements > BalancingTaskNameGrouper > GroupByContainerCountFactory - Rewritten in Java > TestStorageRecovery - Old test depended on the order of the partitions in a > container. Now it doesn't. > TestGroupByContainerCount - Rewritten in Java and lots of tests added for > balance() > BalancingTaskNameGrouper - New interface for the balance method. Exposes the > new functionality without breaking backward compatibility > TaskAssignmentManager - Coordinator stream manager for task-to-container > mapping > TaskNameGroupBalancer - Bridges the task mapping (balance) capability with > taskname groupers, old and new > SetTaskContainerMapping - Coordinator strem message for the task-to-container > mapping > > > Diffs > ----- > > checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java > acf93525ea5c97df187bbe7977e2ae9fea65b32b > > samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java > 211b64215f26db49cd4411ff3fb41231145307d7 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java > 4d093b500b7f3b582446634ced3e9d0b28371616 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala > cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala > 8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 06a96ad6ed786c22924017f894413bfa1ea34c06 > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > 429573b480112c7491303dc410d78f37a308c4a7 > samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java > 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 6e9c6fa579a5901000bea0601c771783d8334f0e > > Diff: https://reviews.apache.org/r/45144/diff/ > > > Testing > ------- > > A bunch of new unit tests have been added. > > Also tested with a test job. The task mapping (initially missing) is added > the first time the job is run. It is then used as expected to reduce task > reassignment as the container count was adjusted from 4->3->5 on subsequent > runs. > > > Thanks, > > Jake Maes > >