-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review137369
-----------------------------------------------------------




samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java (line 
62)
<https://reviews.apache.org/r/48243/#comment202510>

    It would be better to throw an exception when there is a mixture of sync 
and async tasks here, since we don't plan to support that mixture, right?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 85)
<https://reviews.apache.org/r/48243/#comment202520>

    



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 145)
<https://reviews.apache.org/r/48243/#comment202542>

    check the new patch from Jake in SAMZA-951. We are now excluding the 
chooseNs from the activeNs to discount the wait on network I/O from activeNs. 
We should be consistent in AsyncRunLoop here as well.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 212)
<https://reviews.apache.org/r/48243/#comment202544>

    nit: it will improve the readability if defining INFINITE_WAIT_TIME=0L 
here. And in addition, if envelope == null, we should add some explanation on 
why we need to wait pollIntervalMs() here (i.e. I assume that's because that 
choose() will return null immediately w/o waiting for network I/O)?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 220)
<https://reviews.apache.org/r/48243/#comment202546>

    It seems to me that it is more readable if we explicitly separate the logic 
between the null envelope case vs the worker busy case:
    if (envelope == null) {
    ...
    } else {
    ...
    }



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 252)
<https://reviews.apache.org/r/48243/#comment202695>

    Shouldn't we make this processed AtomicBoolean? It will make this 
PendingEnvelope thread-safe and the code simpler as return 
!processed.getAndSet(true);



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 310)
<https://reviews.apache.org/r/48243/#comment202842>

    nit: This method does not return any object.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 515)
<https://reviews.apache.org/r/48243/#comment202840>

    nit: maybe it reads better w/ a name "hasPendingOps()"? Then, the logic in 
blockIfBusy() is also better understood as:
    if (worker.state.isReady() && (envelope != null || 
worker.state.hasPendingOps()) {
      // should continue running since the worker state is ready and there is 
either new message or some pending operations for the worker
      }



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 578)
<https://reviews.apache.org/r/48243/#comment202848>

    Don't quite get this point. It seems to me that the broadcast stream 
message will be called as worker.state.insertEnvelope(envelope) for all tasks, 
which will essentially create a PendingEnvelope for each task. Hence, w/ the 
current code, it will call consumerMultiplexer.tryUpdate() from each task???


- Yi Pan (Data Infrastructure)


On June 9, 2016, 7:49 p.m., Xinyu Liu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> -----------------------------------------------------------
> 
> (Updated June 9, 2016, 7:49 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> -------
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>

Reply via email to