----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48243/#review136357 -----------------------------------------------------------
Biggest take away on this pass is that the async run loop code would be more readable without the two level deep anonymous inner classes. Surfacing how these classes interact with eachother should make this easier to review. samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java (line 73) <https://reviews.apache.org/r/48243/#comment201378> It's been a while and I'm not sure if we discussed: is the goal to ultimately just switch over to async task for both cases (async and sync)? samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 118) <https://reviews.apache.org/r/48243/#comment201758> As far as I can tell, this will only run window and commit in the thread pool but otherwise will run process on the current thread. samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 123 - 128) <https://reviews.apache.org/r/48243/#comment201794> I believe this was copied from the original run loop. It seems to me that it would be nice to have the run loop have lifecycle methods (e.g. shutdown) and have some other application setup class (the container?) that is responsible for tying shutdown of the process to shutting down the run loop. This would allow the run loop lifecycle to be decouple from the process lifecycle at no loss on generality. samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 275 - 276) <https://reviews.apache.org/r/48243/#comment201783> Only for commit and window, right? samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 437 - 442) <https://reviews.apache.org/r/48243/#comment201786> This is a bit confusing to me. I see that we're enqueueing an evelope wrapped in a pending envelope per task worker, but I don't see how we would end up dequeueing the same pending envelope twice. Could we expand on the documentation to make it clearer? Also, depending on how we end up dequeueing twice we need to ensure that the state change (mark processed) is visible. Is that the case? The state is mutable and we're not using anything to publish the state change (as far as I can see). Either we should ensure this state is visible across threads, e.g. with a volatile or CAS, or we should document why it doesn't need to be. samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 518) <https://reviews.apache.org/r/48243/#comment201791> This is the second level deep non-static inner class. At this point reasoning about the interactions between everything is getting pretty difficult. Instead, could we make the interactions a clearer by making these inner classes top-level, package private? I suspect this might involve shuffling some things around. For example, maybe pendingEnvelopeQueue really belongs in workerState and the worker inserts and removes via the state instance. Having a smaller non-static inner class, e.g. for a callback, is reasonable if it is not too complicated, but both of these, as is, are a bit too heavy. samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 71 - 75) <https://reviews.apache.org/r/48243/#comment201795> Same comment as for the async run loop. - Chris Pettitt On June 4, 2016, 1:18 a.m., Xinyu Liu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48243/ > ----------------------------------------------------------- > > (Updated June 4, 2016, 1:18 a.m.) > > > Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data > Infrastructure). > > > Repository: samza > > > Description > ------- > > This is the main part of the change, including the following: > > - New API for AsyncStreamTask and callback. > - Multithread scheduling in AsyncRunLoop > - Callback management for asyn tasks > > > Diffs > ----- > > checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 > samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/task/TaskCallback.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 00648e49f8c7a9bbf5634e18ba0f95feb244613e > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 08a4debb06f9925ae741049abb2ee0df97b2243b > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > 3f25eca6e3dffc57360e8bd8c435177c2a9a910a > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > cf3c4c0ab08a59760bc899c6f2027755e933b350 > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala > 9e6641c3628290dc05e1eb5537e86bff9d37f92c > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > d32a92976e43ca24033b48c91851ee706de7de6b > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > 2efe836fc3b622cbe89e2042a37407f3cf732f58 > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > e280daa9626757cb4d17c0c03eed923277230c3e > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 1358fdd8a386f5f81128ef871c72833d8cf11d86 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 5457f0e05ae4d615b9c86f48a662c54b13828e78 > samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala > 09da62e0f9a10f7c3683345a309c6278ff01fb4b > > Diff: https://reviews.apache.org/r/48243/diff/ > > > Testing > ------- > > unit tests and local testing. > > > Thanks, > > Xinyu Liu > >