----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/44920/#review123902 -----------------------------------------------------------
This is a deceptively big RB :) I got to about SamzaTaskManager, but still need to review from there. This at least gets you some early feedback up to that point and ensures I don't lose any comments. samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line 46) <https://reviews.apache.org/r/44920/#comment186216> Discussed with Jagadish offline, but summarizing here: I think we could simplify some of the interactions across classes if we make this a concrete task that takes a container allocator strategy interface (e.g. standard or host aware). A few nice properties that fall out: 1. The allocators don't need to know the inner workings of AbstractContainerAllocator (ACA). In fact, the implementation of ACA can change without breaking allocators. 2. It easy to test just the allocator strategy in isolation because it becomes more functional in nature (e.g. no direct dependency on state in ACA). 3. Easier to code review due to less cross-class interactions ;) samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line 66) <https://reviews.apache.org/r/44920/#comment186191> It looks like this could be final. samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line 70) <https://reviews.apache.org/r/44920/#comment186192> final if possible - especially as this is exposed to anything in the package. samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line 78) <https://reviews.apache.org/r/44920/#comment186193> final :). Given that you're not doing cas, it would be sufficient to use a volatile for isRunning instead. samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java (line 111) <https://reviews.apache.org/r/44920/#comment186201> This probably doesn't warrant logging, at least not at the info level. Typically you would reinterrupt here which would cause the next blocking call to throw InterrupedException. samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line 108) <https://reviews.apache.org/r/44920/#comment186223> This should be volatile. The class says it is not thread safe, but I don't see an obvious way for coding calling onError to ensure that the thread running run will see this change. samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line 121) <https://reviews.apache.org/r/44920/#comment186229> I would make this a final AtomicBoolean and cas(false, true) at the beginning of run. Otherwise it could be possible to run this twice even when synchronized, e.g.: ``` synchronize(lock) { Thread t1 = new Thread(coordinator); t1.start(); Thread t2 = new Thread(coordinator); t2.start(); } ``` There is no happens-before constraint between the execution of run in t1 and the execution of run in t2 (they are both done on threads that don't hold the lock), so the check may fail. samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line 153) <https://reviews.apache.org/r/44920/#comment186230> A few other fields here are trivial opportunities for being pulled up to the constructor and made final. samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (lines 163 - 164) <https://reviews.apache.org/r/44920/#comment186220> It looks like this could be pulled into the constructor. Benefits: 1. We can make the processManager final. 2. We don't need to verify that processManager is not null in shutdown. 3. We can fail early (at construction time) if the configuration is invalid. samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line 191) <https://reviews.apache.org/r/44920/#comment186231> Yes :) samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (line 194) <https://reviews.apache.org/r/44920/#comment186233> Jake already pointed out some (all?) of these, so I will comment just once that I agree they should be pulled. We should use the logger to capture stack traces. samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java (lines 246 - 263) <https://reviews.apache.org/r/44920/#comment186234> How paranoid do we need to be here? Do any of these interact with exernal systems? If so, I would guard everything on the shutdown path so that we're sure that a RuntimeException or Error doesn't cause us to miss one of these systems. samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java (line 74) <https://reviews.apache.org/r/44920/#comment186235> It looks like this should be an interface. samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line 38) <https://reviews.apache.org/r/44920/#comment186239> I sprinkled some comments on threading below. After a few of those (which I'll keep for reference), I would suggest instead that we re-evaluate this class as a whole. We can either: 1. Determine that everything can be done such that it appears atomic without a lock, in which case we use a ConcurrentLinkedQueue in allocatedContainers, ensure that SamzaResource and SamzaResourceRequest are immutable or they are copied on update, and drop all synchronized statements. 2. Determine that we actually do need locks around critical sections and thus drop the additional layer of locking in CHM and PBQ. samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line 46) <https://reviews.apache.org/r/44920/#comment186236> This looks almost thread safe, assuming thread safety of all of the contained objects. If that assumption holds you could a ConcurrentLinkedQueue. However, if there is something stateful going on with the objects contained here or below then it would make more sense to drop CHM as proposed. samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line 75) <https://reviews.apache.org/r/44920/#comment186237> I generally suggest that you don't use synchronized on methods. The reason is that there is nothing preventing somone holding an instance of ContainerRequestState from using it as a lock elsewhere, which could cause deadlocks or a degradation in performance that is hard to track down. The alternatives would be to either use a private final Object as a lock or use something like a ReentrantLock. samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java (line 93) <https://reviews.apache.org/r/44920/#comment186238> If we wanted to get rid of synchronization here, you could use putIfAbsent to make this an atomic operation. As is, this unnecessarily involves two lock acquisitions, which would go away if we switch to a regular HashMap. samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java (line 30) <https://reviews.apache.org/r/44920/#comment186240> final and... samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java (line 34) <https://reviews.apache.org/r/44920/#comment186243> final. These fields are private and the only accessors are read-only, so using final makes this object immutable and buys better guarantees about visibility across threads (https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.5) samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java (line 36) <https://reviews.apache.org/r/44920/#comment186244> Agreed. This is a bit mix of atomic and mutable state. Either make it thread safe (e.g. lock down mutability of fields), ensure that remaining mutable state can be updated atomically, or decide it is not intended to be thread safe and get rid of additional locks / volatile accesses that add cost with no gain. Former is preferred :). samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java (line 27) <https://reviews.apache.org/r/44920/#comment186245> Fortunately this one is mostly a non-issue these days, but... It is a best practice to add an explicit serialVersionUID to Exceptions because they are Serializable. At one point we actually serialized out Java objects all over the place and trivial, backwards compatible changes would break the ability to load these objects. It should be a non-issue now, but since it is such a simple change I would suggest doing it anyway. samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java (line 51) <https://reviews.apache.org/r/44920/#comment186246> Should host be considered as part of equality and hashCode? samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java (line 61) <https://reviews.apache.org/r/44920/#comment186247> This is more of a stylistic opinion that you can choose to ignore: I typically put static fields about instance level fields in a class. I think this was done pretty consistently in other parts of the code I've review too. samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java (lines 42 - 44) <https://reviews.apache.org/r/44920/#comment186249> Please make these private if possible. Otherwise, they can be changed by any other class in the package without going through the setter methods. It makes it trickier to review, at the very least :). Also, this looks like a good candidate for an immutable class, using copy on write if necessary. samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java (line 43) <https://reviews.apache.org/r/44920/#comment186250> Not thread safe. samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java (line 79) <https://reviews.apache.org/r/44920/#comment186254> This should be volatile, especially as I believe it is being used to convey state across threads (e.g. whatever calls onContainerCompleted and the main thread in ClusterBasedJobCoordinator). - Chris Pettitt On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/44920/ > ----------------------------------------------------------- > > (Updated March 16, 2016, 6:23 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan > (Data Infrastructure), Navina Ramesh, and Xinyu Liu. > > > Repository: samza > > > Description > ------- > > Samza currently has tight coupling with Yarn. This makes it impossible to > integrate with other resource managers like Mesos, or to run standalone > without any resource manager. This RB is a step to implementing SAMZA-881. > > Design Doc: > https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf > > 1.Proposed new APIs for a resource manager to integrate with Samza. > (SAMZA-881) > - Defined the ContainerProcessManager abstraction, SamzaResource, > SamzaResourceRequest. > - Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator. > - Re-wrote yarn specific request logic by abstracting it into a > YarnContainerManager. > 2.Defined a ClusterManagerConfig to handle configurations independent of > Yarn/Mesos. > 3.Made Samza's cluster interaction independent of Yarn. This separates Samza > specific components into samza-core and Yarn components into samza-yarn. > 4.Readability improvements to the existing code base. > -Added docs for most methods, member variables and classes (including on > thread-safety) > - Made internal variables final to document intent, visibility across > threads. (trivially by adding modifiers, or by changing where they're > initialized.) > 5.Refactored JobCoordinator into a JobModelReader. > > TODO: Can go into the upcoming release. (P0) > 1.Refactor the UI state variables and tests. Port some method re-orgs from > SAMZA-867 into here. > 2.Revise packaging structure. > 4.Document new configs. > 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the > non-refactored namespace. (For unit-testing, these files continue to exist) > > TODO: (P1) > 1.Build Mesos integration for Samza. Should be simpler to integrate with the > newer APIs. > - I started on this, and I plan to refine and post an RB in one of the > hack-days. > 2.Refactor the SamzaAppState class to provide more accessors and eliminate > public variables. (This was > a consequence of the already existing design which I've tried to be > compatible with) > > TODO: I plan to track these with JIRAs so that they can be done later. (P2) > 1.Get rid of the HTTP Server in the JobCoordinator > 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881. > > > Diffs > ----- > > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/metrics/SamzaAppMasterMetrics.scala > PRE-CREATION > samza-shell/src/main/bash/run-am.sh > 9545a96953baaff17ad14962e02bc12aadbb1101 > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnAppState.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManagerFactory.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerRunner.java > PRE-CREATION > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala > 6bf3046a1ae4ed8f57500acae763184084ad0e09 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaAppMasterService.scala > PRE-CREATION > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaYarnAppMasterLifecycle.scala > PRE-CREATION > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 30cf34fe1fd3f74537d16e8a51b467cd50835357 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 7f5d9f4af088589d4287c26737bae25567c157d7 > > Diff: https://reviews.apache.org/r/44920/diff/ > > > Testing > ------- > > 1.Tested with running test jobs in Yarn clusters of varying sizes from 1 node > to 36 nodes. > 2.Tested for failures by killing containers, and ensuring they were brought > up again. > > TODO: > 1.Refactor all unit tests, and ensure updates to state variables are > consistent with the current design. > 2.More testing with test jobs on clusters. > > > Thanks, > > Jagadish Venkatraman > >