> On March 23, 2016, 7:57 a.m., Jagadish Venkatraman wrote: > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java, > > line 180 > > <https://reviews.apache.org/r/45144/diff/1/?file=1309651#file1309651line180> > > > > This logic is well written :-) > > > > It will really help if you can illustrate with a small example as a > > comment. > > 1. when counts increase. > > 2. when counts decrease. > > Jagadish Venkatraman wrote: > Here's an idea, save it for later if you think it's a total diversion. > But, I believe it's worth investigating. > > How about making the TaskAssignmentManager immutable? That way, the > TaskAssignment manager is given the list of assignments when it starts and > the assignments does not change. > > The following are the advantages: > 1. Currently, the map that tracks <taskNameToContainerId> can be made > final. > 2. The mapping can be initialized once during construction. The > constructor could take care of deleting the old mappings, and re-refreshing > the new mappings into the coordinator stream. > 3. Do we change the map after the JobCoordinator starts/stops containers? > If not, then it makes sense for it to be immutable. > 4. We can fail early during construction time. > 5. Methods that return the taskNameToContainerId can now return a cached > copy as opposed to re-bootstrapping from coordinator stream each time.
I just realized that it's hard to implement this. I believe that the TaskAssignmentManager has been modelled according to the LocalityManager that obtains the container locality. Never mind. - Jagadish ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/45144/#review124812 ----------------------------------------------------------- On March 22, 2016, 1:20 a.m., Jake Maes wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/45144/ > ----------------------------------------------------------- > > (Updated March 22, 2016, 1:20 a.m.) > > > Review request for samza, Navina Ramesh, Jagadish Venkatraman, and Yi Pan > (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Persist the task-to-container mapping in the coordinator stream and use it to > minimize the reassignment when the container count changes. > > A new BalancingTaskNameGrouper interface exposes the balance() method, which > can be implemented pluggably. > GroupByContainerCount has been rewritten in java and the balance() > functionality added. This is because the balance logic is specific to a > grouper. > > Detailed changes: > import-control.xml - Update imports that weren't needed in scala and add some > for tests. > LocalityManager.java - Add TaskAssignment manager. This mostly just keeps the > JobCoordinator code cleaner, but also associates the 2 managers for Host > Affinity info. > GroupByContainerCount - THE BIG ONE. Rewritten in Java and it now implements > BalancingTaskNameGrouper > GroupByContainerCountFactory - Rewritten in Java > TestStorageRecovery - Old test depended on the order of the partitions in a > container. Now it doesn't. > TestGroupByContainerCount - Rewritten in Java and lots of tests added for > balance() > BalancingTaskNameGrouper - New interface for the balance method. Exposes the > new functionality without breaking backward compatibility > TaskAssignmentManager - Coordinator stream manager for task-to-container > mapping > TaskNameGroupBalancer - Bridges the task mapping (balance) capability with > taskname groupers, old and new > SetTaskContainerMapping - Coordinator strem message for the task-to-container > mapping > > > Diffs > ----- > > checkstyle/import-control.xml 53cb8b447240fea08d98ccfb12ed24bec6cbf67c > samza-core/src/main/java/org/apache/samza/container/LocalityManager.java > acf93525ea5c97df187bbe7977e2ae9fea65b32b > > samza-core/src/main/java/org/apache/samza/container/grouper/task/BalancingTaskNameGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/GroupByContainerCount.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskNameGroupBalancer.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/stream/AbstractCoordinatorStreamManager.java > 211b64215f26db49cd4411ff3fb41231145307d7 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetContainerHostMapping.java > 4d093b500b7f3b582446634ced3e9d0b28371616 > > samza-core/src/main/java/org/apache/samza/coordinator/stream/messages/SetTaskContainerMapping.java > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCount.scala > cb0a3bde15174c53f8eb3c0dbbb4f59dbf2589b1 > > samza-core/src/main/scala/org/apache/samza/container/grouper/task/GroupByContainerCountFactory.scala > 8bbfd639cd9ea1d758d0daa45ce41093c1cb66f6 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 06a96ad6ed786c22924017f894413bfa1ea34c06 > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestGroupByContainerCount.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskNameGroupBalancer.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/container/mock/ContainerMocks.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamSystemFactory.java > e0d4aa1016d61ce328d7ff74b58f7b8f7682f386 > > samza-core/src/test/java/org/apache/samza/coordinator/stream/MockCoordinatorStreamWrappedConsumer.java > 429573b480112c7491303dc410d78f37a308c4a7 > samza-core/src/test/java/org/apache/samza/storage/TestStorageRecovery.java > 53207ad7e87fe491c6ae95ae6c590c6d5668d3dc > > samza-core/src/test/scala/org/apache/samza/container/grouper/task/TestGroupByContainerCount.scala > 6e9c6fa579a5901000bea0601c771783d8334f0e > > Diff: https://reviews.apache.org/r/45144/diff/ > > > Testing > ------- > > A bunch of new unit tests have been added. > > Also tested with a test job. The task mapping (initially missing) is added > the first time the job is run. It is then used as expected to reduce task > reassignment as the container count was adjusted from 4->3->5 on subsequent > runs. > > > Thanks, > > Jake Maes > >