----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48243/#review137369 -----------------------------------------------------------
samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java (line 62) <https://reviews.apache.org/r/48243/#comment202510> It would be better to throw an exception when there is a mixture of sync and async tasks here, since we don't plan to support that mixture, right? samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 85) <https://reviews.apache.org/r/48243/#comment202520> samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 145) <https://reviews.apache.org/r/48243/#comment202542> check the new patch from Jake in SAMZA-951. We are now excluding the chooseNs from the activeNs to discount the wait on network I/O from activeNs. We should be consistent in AsyncRunLoop here as well. samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 212) <https://reviews.apache.org/r/48243/#comment202544> nit: it will improve the readability if defining INFINITE_WAIT_TIME=0L here. And in addition, if envelope == null, we should add some explanation on why we need to wait pollIntervalMs() here (i.e. I assume that's because that choose() will return null immediately w/o waiting for network I/O)? samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 220) <https://reviews.apache.org/r/48243/#comment202546> It seems to me that it is more readable if we explicitly separate the logic between the null envelope case vs the worker busy case: if (envelope == null) { ... } else { ... } samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 252) <https://reviews.apache.org/r/48243/#comment202695> Shouldn't we make this processed AtomicBoolean? It will make this PendingEnvelope thread-safe and the code simpler as return !processed.getAndSet(true); samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 310) <https://reviews.apache.org/r/48243/#comment202842> nit: This method does not return any object. samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 515) <https://reviews.apache.org/r/48243/#comment202840> nit: maybe it reads better w/ a name "hasPendingOps()"? Then, the logic in blockIfBusy() is also better understood as: if (worker.state.isReady() && (envelope != null || worker.state.hasPendingOps()) { // should continue running since the worker state is ready and there is either new message or some pending operations for the worker } samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 578) <https://reviews.apache.org/r/48243/#comment202848> Don't quite get this point. It seems to me that the broadcast stream message will be called as worker.state.insertEnvelope(envelope) for all tasks, which will essentially create a PendingEnvelope for each task. Hence, w/ the current code, it will call consumerMultiplexer.tryUpdate() from each task??? - Yi Pan (Data Infrastructure) On June 9, 2016, 7:49 p.m., Xinyu Liu wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48243/ > ----------------------------------------------------------- > > (Updated June 9, 2016, 7:49 p.m.) > > > Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data > Infrastructure). > > > Repository: samza > > > Description > ------- > > This is the main part of the change, including the following: > > - New API for AsyncStreamTask and callback. > - Multithread scheduling in AsyncRunLoop > - Callback management for asyn tasks > > > Diffs > ----- > > checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 > samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java > PRE-CREATION > samza-api/src/main/java/org/apache/samza/task/TaskCallback.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION > samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala > 00648e49f8c7a9bbf5634e18ba0f95feb244613e > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 08a4debb06f9925ae741049abb2ee0df97b2243b > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > 3f25eca6e3dffc57360e8bd8c435177c2a9a910a > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > cf3c4c0ab08a59760bc899c6f2027755e933b350 > > samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala > 9e6641c3628290dc05e1eb5537e86bff9d37f92c > samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala > d32a92976e43ca24033b48c91851ee706de7de6b > > samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala > 8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 > samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala > 2efe836fc3b622cbe89e2042a37407f3cf732f58 > samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java > PRE-CREATION > samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java > PRE-CREATION > samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala > e280daa9626757cb4d17c0c03eed923277230c3e > > samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala > 1358fdd8a386f5f81128ef871c72833d8cf11d86 > samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala > 5457f0e05ae4d615b9c86f48a662c54b13828e78 > samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala > 09da62e0f9a10f7c3683345a309c6278ff01fb4b > > Diff: https://reviews.apache.org/r/48243/diff/ > > > Testing > ------- > > unit tests and local testing. > > > Thanks, > > Xinyu Liu > >