> On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, > > line 78 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301262#file1301262line78> > > > > final :). > > > > Given that you're not doing cas, it would be sufficient to use a > > volatile for isRunning instead.
Nice find. I've fixed it to use a volatile boolean. However, I'm not modifying/fixing the existing code in https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/AbstractContainerAllocator.java since it's going away. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, > > line 111 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301262#file1301262line111> > > > > This probably doesn't warrant logging, at least not at the info level. > > Typically you would reinterrupt here which would cause the next blocking > > call to throw InterrupedException. Fixed :) > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java, > > line 70 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301262#file1301262line70> > > > > final if possible - especially as this is exposed to anything in the > > package. I'll make this final. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, > > line 108 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line108> > > > > This should be volatile. The class says it is not thread safe, but I > > don't see an obvious way for coding calling onError to ensure that the > > thread running run will see this change. Good point. I've changed this to a volatile boolean - exceptionOccured. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, > > line 121 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line121> > > > > I would make this a final AtomicBoolean and cas(false, true) at the > > beginning of run. Otherwise it could be possible to run this twice even > > when synchronized, e.g.: > > > > ``` > > synchronize(lock) > > { > > Thread t1 = new Thread(coordinator); > > t1.start(); > > Thread t2 = new Thread(coordinator); > > t2.start(); > > } > > ``` > > > > There is no happens-before constraint between the execution of run in > > t1 and the execution of run in t2 (they are both done on threads that don't > > hold the lock), so the check may fail. Great point. I've modified this to use a CAS as suggested. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, > > line 153 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line153> > > > > A few other fields here are trivial opportunities for being pulled up > > to the constructor and made final. Done. I've pulled fields into the class. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, > > lines 163-164 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line163> > > > > It looks like this could be pulled into the constructor. Benefits: > > > > 1. We can make the processManager final. > > 2. We don't need to verify that processManager is not null in shutdown. > > 3. We can fail early (at construction time) if the configuration is > > invalid. Thanks for explaining. Fixed. This is refactored from SamzaAppMaster.scala, I'm not however, changing the original code in SamzaAppMaster.scala. (since it will go away soon) > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, > > line 191 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line191> > > > > Yes :) Done :) Thanks for confirming > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java, > > lines 246-263 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301263#file1301263line246> > > > > How paranoid do we need to be here? Do any of these interact with > > exernal systems? If so, I would guard everything on the shutdown path so > > that we're sure that a RuntimeException or Error doesn't cause us to miss > > one of these systems. I've guarded everything in the shutdown path so that we shut down all components. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java, > > line 74 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301266#file1301266line74> > > > > It looks like this should be an interface. I played around with 3 ideas: 1. Making ContainerProcessManager, Callback as separate interfaces. Yarn, Mesos implementations will implement them. 2. Making ContainerProcessManager an abstract class taking a Callback in its constructor and making Callback as its static interface. Yarn, Mesos implementations will extend ContainerProcessManager. (this is the current design I went with) 3. Making ContainerProcessManager, Callback as separate interfaces and creating a separate AbstractContainerProcessManager abstract class that implements ContainerProcessManager. Yarn, Mesos implementations will extend AbstractContainerProcessManager. My goals were to A. Ensure that an instance of a ContainerProcessManager implementation is always created with a callback. I considered including a registerCallback(Callback) in the ContainerProcessManager interface. But, providing the callback during construction seemed more cleaner. It makes the lifecycle easier to reason about. (for ex, we could make the callback final) B. If we add more methods to the ContainerProcessManager, it would be great if we don't break clients. Having it as an abstract class makes it easier to provide default implementations. (until JDK8 https://docs.oracle.com/javase/tutorial/java/IandI/defaultmethods.html :D ). Out of 2,3 (both satisfy the above goals), 2 seemed simpler. Another nice effect, It associated the Callback as a static interface inside ContainerProcessManager (as a Callback is always meant to be used with a ContainerProcessManager) and (conversely, a ContainerProcessManager is always meant to be used by providing a Callback). > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java, > > line 38 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line38> > > > > I sprinkled some comments on threading below. After a few of those > > (which I'll keep for reference), I would suggest instead that we > > re-evaluate this class as a whole. We can either: > > > > 1. Determine that everything can be done such that it appears atomic > > without a lock, in which case we use a ConcurrentLinkedQueue in > > allocatedContainers, ensure that SamzaResource and SamzaResourceRequest are > > immutable or they are copied on update, and drop all synchronized > > statements. > > 2. Determine that we actually do need locks around critical sections > > and thus drop the additional layer of locking in CHM and PBQ. I've refactored the ContainerRequest state class to be thread-safe. I ended up going with (2). Eventhough (1) maybe more efficient, I did not want to make several changes to the existing logic (prior to this refactor). See https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerRequestState.java . I'm not porting these changes back to the existing yarn/ContainerRequestState, as it'll be throw-away work. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java, > > line 46 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line46> > > > > This looks almost thread safe, assuming thread safety of all of the > > contained objects. If that assumption holds you could a > > ConcurrentLinkedQueue. > > > > However, if there is something stateful going on with the objects > > contained here or below then it would make more sense to drop CHM as > > proposed. I've dropped the CHM, and made it to a Map. All accesses have to be guarded by a lock anyways. Might as well, get rid off the additional level of locking. Also, another change is to make methods return a copy instead of an original object (so that callers can mess around with the copy). > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java, > > line 75 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line75> > > > > I generally suggest that you don't use synchronized on methods. The > > reason is that there is nothing preventing somone holding an instance of > > ContainerRequestState from using it as a lock elsewhere, which could cause > > deadlocks or a degradation in performance that is hard to track down. The > > alternatives would be to either use a private final Object as a lock or use > > something like a ReentrantLock. Great point! I've changed it to use a final object as a lock. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java, > > line 93 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301267#file1301267line93> > > > > If we wanted to get rid of synchronization here, you could use > > putIfAbsent to make this an atomic operation. As is, this unnecessarily > > involves two lock acquisitions, which would go away if we switch to a > > regular HashMap. The newer change removes the CHM, and protects it by a single lock. We have to use the lock anyways, might as well get rid off the CHM to remove the extra layer of locking. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java, > > line 30 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301269#file1301269line30> > > > > final and... Good find. Fixed! > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java, > > line 34 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301269#file1301269line34> > > > > final. > > > > These fields are private and the only accessors are read-only, so using > > final makes this object immutable and buys better guarantees about > > visibility across threads > > (https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.5) Fixed :) I'm not porting these changes back to https://github.com/apache/samza/blob/master/samza-yarn/src/main/java/org/apache/samza/job/yarn/ContainerFailure.java (as the class will exist only for a short time) > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java, > > line 36 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301270#file1301270line36> > > > > Agreed. This is a bit mix of atomic and mutable state. Either make it > > thread safe (e.g. lock down mutability of fields), ensure that remaining > > mutable state can be updated atomically, or decide it is not intended to be > > thread safe and get rid of additional locks / volatile accesses that add > > cost with no gain. Former is preferred :). > > Jagadish Venkatraman wrote: > Great feedback Chris! I could not agree more! :-) > > SamzaAppState class is a currently a source of major problems. I did not > want to touch it (as it was not scoped in this refactoring). Upon digging > further, I realize the problem of making this thread-safe/private is slightly > involved. > > 1. There is a jobCoordinator object that is exposed publicly as a part of > SamzaAppState. The jobCoordinator inturn exposes a nested jobModel instance > directly thorough its accessor. The JobModel embeds a LocalityManager that > mutates state during some public method calls. Hence, The jobModel instance > is *not* thread-safe and is shared concurrently across the UI threads, the > HTTP server threads in the queued thread pool,the SamzaAppMaster thread. > (Created SAMZA-899 to make the JobModel immutable) > > 2. There are a bunch of state data structures that are publicly exposed > in SamzaAppState. These must be made thread-safe into accessors. These public > global variables could be mutated everywhere in Samza without regard for > safety/visibility or correctness. For example, there is an integer > containerCount that is public which is manipulated by both the metrics > reporter and the callback threads. (I created SAMZA-901 to track this) > > I will work on both of these as these ASAP. > > Jagadish Venkatraman wrote: > Just a clarification: > 1. The JobModel instance is shared concurrently as stated in [1]. This > presents a source of *potential* problems. (I believe there is not an actual > bug in the JobModel ) > 2. The containerCount is a public int that *could* be manipulated by both > the reporter and callback. I believe the current interaction does not have > any races (since count is just set at the startup once). But, having as > public non-final int *could* be a source of potential problems if it was > modified elsewhere. > > Chris Pettitt wrote: > For #2, the state of containerCount in onContainerCompleted is undefined > unless start transitively "happens-before" onContainerCompleted. It may be > that that holds, but it is not obvious to me (without spending more time > working out the call graph). If it's not a problem now it could become one > with a seemingly innocent refactoring. Agreed! I've made containerCount an atomic integer. I believe that currently solves the visibility, atomicity problem for now for that field. Long term, I believe SamzaAppState should be thread-safe (using copy-on-write), with private final variables, and accessors. However, making this thread-safe/private is a change that is more involved. (It will be a separate effort altogether - SAMZA-902 tracks this) > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java, > > line 27 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301271#file1301271line27> > > > > Fortunately this one is mostly a non-issue these days, but... > > > > It is a best practice to add an explicit serialVersionUID to Exceptions > > because they are Serializable. At one point we actually serialized out Java > > objects all over the place and trivial, backwards compatible changes would > > break the ability to load these objects. It should be a non-issue now, but > > since it is such a simple change I would suggest doing it anyway. Good point. I've added a serialVersionUID to exceptions. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java, > > line 51 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301272#file1301272line51> > > > > Should host be considered as part of equality and hashCode? A resource represents a slice of a host. My assumption was a resource (such a slice) could uniquely identified by a resourceID (ie) resources on the same host will have different IDs anyways (for example - they may differ in memory, cpu, disk etc.) Do you think it's a reasonable one? > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java, > > line 61 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301273#file1301273line61> > > > > This is more of a stylistic opinion that you can choose to ignore: I > > typically put static fields about instance level fields in a class. I think > > this was done pretty consistently in other parts of the code I've review > > too. Thanks for the feedback, I agree that its good to be consistent with placement of statics. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java, > > lines 42-44 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301274#file1301274line42> > > > > Please make these private if possible. Otherwise, they can be changed > > by any other class in the package without going through the setter methods. > > It makes it trickier to review, at the very least :). > > > > Also, this looks like a good candidate for an immutable class, using > > copy on write if necessary. I've made these private and final. Thanks for the feedback! > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java, > > line 43 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301275#file1301275line43> > > > > Not thread safe. I've added docs for thread-safety. I've called out that it's not thread-safe. > On March 16, 2016, 8:48 p.m., Chris Pettitt wrote: > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java, > > line 79 > > <https://reviews.apache.org/r/44920/diff/1/?file=1301275#file1301275line79> > > > > This should be volatile, especially as I believe it is being used to > > convey state across threads (e.g. whatever calls onContainerCompleted and > > the main thread in ClusterBasedJobCoordinator). Agreed, good find :) made it volatile for visibility. - Jagadish ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/44920/#review123902 ----------------------------------------------------------- On March 16, 2016, 6:23 p.m., Jagadish Venkatraman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/44920/ > ----------------------------------------------------------- > > (Updated March 16, 2016, 6:23 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Jake Maes, Yi Pan > (Data Infrastructure), Navina Ramesh, and Xinyu Liu. > > > Repository: samza > > > Description > ------- > > Samza currently has tight coupling with Yarn. This makes it impossible to > integrate with other resource managers like Mesos, or to run standalone > without any resource manager. This RB is a step to implementing SAMZA-881. > > Design Doc: > https://issues.apache.org/jira/secure/attachment/12790540/SamzaJobCoordinatorRe-designProposal.pdf > > 1.Proposed new APIs for a resource manager to integrate with Samza. > (SAMZA-881) > - Defined the ContainerProcessManager abstraction, SamzaResource, > SamzaResourceRequest. > - Re-wrote the SamzaAppMaster into a ClusterBasedJobCoordinator. > - Re-wrote yarn specific request logic by abstracting it into a > YarnContainerManager. > 2.Defined a ClusterManagerConfig to handle configurations independent of > Yarn/Mesos. > 3.Made Samza's cluster interaction independent of Yarn. This separates Samza > specific components into samza-core and Yarn components into samza-yarn. > 4.Readability improvements to the existing code base. > -Added docs for most methods, member variables and classes (including on > thread-safety) > - Made internal variables final to document intent, visibility across > threads. (trivially by adding modifiers, or by changing where they're > initialized.) > 5.Refactored JobCoordinator into a JobModelReader. > > TODO: Can go into the upcoming release. (P0) > 1.Refactor the UI state variables and tests. Port some method re-orgs from > SAMZA-867 into here. > 2.Revise packaging structure. > 4.Document new configs. > 5.Rename run-am.sh to run-coordinator.sh, Delete all files in the > non-refactored namespace. (For unit-testing, these files continue to exist) > > TODO: (P1) > 1.Build Mesos integration for Samza. Should be simpler to integrate with the > newer APIs. > - I started on this, and I plan to refine and post an RB in one of the > hack-days. > 2.Refactor the SamzaAppState class to provide more accessors and eliminate > public variables. (This was > a consequence of the already existing design which I've tried to be > compatible with) > > TODO: I plan to track these with JIRAs so that they can be done later. (P2) > 1.Get rid of the HTTP Server in the JobCoordinator > 2.Make YarnJobCoordinator implement the JobCoordinator API as SAMZA-881. > > > Diffs > ----- > > > samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ClusterBasedJobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManagerFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerProcessManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ContainerRequestState.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/ResourceFailure.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaAppState.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaContainerLaunchException.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceRequest.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResourceStatus.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/clustermanager/SamzaTaskManager.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/config/ClusterManagerConfig.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/coordinator/JobModelReader.scala > PRE-CREATION > > samza-core/src/main/scala/org/apache/samza/metrics/SamzaAppMasterMetrics.scala > PRE-CREATION > samza-shell/src/main/bash/run-am.sh > 9545a96953baaff17ad14962e02bc12aadbb1101 > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnAppState.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManager.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerManagerFactory.java > PRE-CREATION > > samza-yarn/src/main/java/org/apache/samza/job/yarn/refactor/YarnContainerRunner.java > PRE-CREATION > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnAppMasterListener.scala > 6bf3046a1ae4ed8f57500acae763184084ad0e09 > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaAppMasterService.scala > PRE-CREATION > > samza-yarn/src/main/scala/org/apache/samza/job/yarn/refactor/SamzaYarnAppMasterLifecycle.scala > PRE-CREATION > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterLifecycle.scala > 30cf34fe1fd3f74537d16e8a51b467cd50835357 > > samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaAppMasterService.scala > 7f5d9f4af088589d4287c26737bae25567c157d7 > > Diff: https://reviews.apache.org/r/44920/diff/ > > > Testing > ------- > > 1.Tested with running test jobs in Yarn clusters of varying sizes from 1 node > to 36 nodes. > 2.Tested for failures by killing containers, and ensuring they were brought > up again. > > TODO: > 1.Refactor all unit tests, and ensure updates to state variables are > consistent with the current design. > 2.More testing with test jobs on clusters. > > > Thanks, > > Jagadish Venkatraman > >