----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/45144/#review125157 -----------------------------------------------------------
samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java (line 99) <https://reviews.apache.org/r/45144/#comment187940> Question: why do we care that the minimum movement only applies to the case where container number changes? Even if the container numbers are the same, won't it be benefitial that we just honor the previous task assignment for stateful jobs, instead of re-computing? samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java (line 227) <https://reviews.apache.org/r/45144/#comment188112> What if taskNamesToAssign.isEmpty()==true here? samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java (line 231) <https://reviews.apache.org/r/45144/#comment188111> What if at the end, taskNamesToAssign is not empty? samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java (line 245) <https://reviews.apache.org/r/45144/#comment188110> It seems like the logic here is assuming: 1) we always try to "fill up" each container from 0-n based on average count 2) in successive runs, there can't be any task-container mapping sequence like: 3 tasks -> container 0 3 tasks -> container 1 1 task -> container 2 3 tasks -> container 3 In the above case, when a new container 4 is added, there will not be balanced assignment since container 4 will get 3 tasks. I would suggest that we don't assume what would be the result from the previous balance and the calculation is always based on the whole set of tasks <-> containers. A simple logic would be: - initialize the set of re-assignable tasks (i.e. from containers that are removed, from containers that are over the expected average) - sort all containers based on the current assigned number of tasks, minus the re-assignable tasks - start assigning the re-assigning tasks from the end of the sorted container list, till it reaches the expected average. - stop the whole algorithm till the re-assignable tasks set is empty. samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java (line 62) <https://reviews.apache.org/r/45144/#comment188119> Do we have a use case for write-only SamzaTaskAssignmentManager? We needed it for LocalityManager because SamzaContainer includes an instance of LocalityManager which is write-only. If we don't have the write-only use case, we should just remove the write-only flag. samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java (line 37) <https://reviews.apache.org/r/45144/#comment188120> nit: the key in the example shows containerId = 1 and the values show containerId = 139. They should be the same, right? - Yi Pan (Data Infrastructure) On March 24, 2016, 9:33 p.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/45144/ > ----------------------------------------------------------- > > (Updated March 24, 2016, 9:33 p.m.) > > > Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan > (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Persist the task-to-container mapping in the coordinator stream and use it to > minimize the reassignment when the container count changes. > > A new BalancingTaskNameGrouper interface exposes the balance() method, which > can be implemented pluggably. > GroupByContainerCount has been rewritten in java and the balance() > functionality added. This is because the balance logic is specific to a > grouper. > > Detailed changes: > import-control.xml - Update imports that weren't needed in scala and add some > for tests. > LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the > JobCoordinator code cleaner, but also associates the 2 managers for Host > Affinity info. > GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements > BalancingTaskNameGrouper > GroupByContainerCountFactory - Rewritten in Java > TestStorageRecovery - Old test depended on the order of the partitions in a > container. Now it doesn't. > TestGroupByContainerCount - Rewritten in Java and lots of tests added for > balance() > BalancingTaskNameGrouper - New interface for the balance method. Exposes the > new functionality without breaking backward compatibility > TaskAssignmentManager - Coordinator stream manager for task-to-container > mapping > TaskNameGroupBalancer - Bridges the task mapping (balance) capability with > taskname groupers, old and new > SetTaskContainerMapping - Coordinator strem message for the task-to-container > mapping > > > Diffs > ----- > > checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java > acf93525ea5c97df187bbe7977e2ae9fea65b32b > > samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java > 211b64215f26db49cd4411ff3fb41231145307d7 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java > 4d093b500b7f3b582446634ced3e9d0b28371616 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala > cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala > 8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 06a96ad6ed786c22924017f894413bfa1ea34c06 > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > 429573b480112c7491303dc410d78f37a308c4a7 > samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java > 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 6e9c6fa579a5901000bea0601c771783d8334f0e > > Diff: https://reviews.apache.org/r/45144/diff/ > > > Testing > ------- > > A bunch of new unit tests have been added. > > Also tested with a test job. The task mapping (initially missing) is added > the first time the job is run. It is then used as expected to reduce task > reassignment as the container count was adjusted from 4->3->5 on subsequent > runs. > > > Thanks, > > Jake Maes > >