-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review136357
-----------------------------------------------------------



Biggest take away on this pass is that the async run loop code would be more 
readable without the two level deep anonymous inner classes. Surfacing how 
these classes interact with eachother should make this easier to review.


samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java (line 
73)
<https://reviews.apache.org/r/48243/#comment201378>

    It's been a while and I'm not sure if we discussed: is the goal to 
ultimately just switch over to async task for both cases (async and sync)?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 118)
<https://reviews.apache.org/r/48243/#comment201758>

    As far as I can tell, this will only run window and commit in the thread 
pool but otherwise will run process on the current thread.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 123 - 
128)
<https://reviews.apache.org/r/48243/#comment201794>

    I believe this was copied from the original run loop. It seems to me that 
it would be nice to have the run loop have lifecycle methods (e.g. shutdown) 
and have some other application setup class (the container?) that is 
responsible for tying shutdown of the process to shutting down the run loop. 
This would allow the run loop lifecycle to be decouple from the process 
lifecycle at no loss on generality.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 275 - 
276)
<https://reviews.apache.org/r/48243/#comment201783>

    Only for commit and window, right?



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (lines 437 - 
442)
<https://reviews.apache.org/r/48243/#comment201786>

    This is a bit confusing to me. I see that we're enqueueing an evelope 
wrapped in a pending envelope per task worker, but I don't see how we would end 
up dequeueing the same pending envelope twice. Could we expand on the 
documentation to make it clearer?
    
    Also, depending on how we end up dequeueing twice we need to ensure that 
the state change (mark processed) is visible. Is that the case? The state is 
mutable and we're not using anything to publish the state change (as far as I 
can see). Either we should ensure this state is visible across threads, e.g. 
with a volatile or CAS, or we should document why it doesn't need to be.



samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java (line 518)
<https://reviews.apache.org/r/48243/#comment201791>

    This is the second level deep non-static inner class. At this point 
reasoning about the interactions between everything is getting pretty 
difficult. Instead, could we make the interactions a clearer by making these 
inner classes top-level, package private? I suspect this might involve 
shuffling some things around. For example, maybe pendingEnvelopeQueue really 
belongs in workerState and the worker inserts and removes via the state 
instance.
    
    Having a smaller non-static inner class, e.g. for a callback, is reasonable 
if it is not too complicated, but both of these, as is, are a bit too heavy.



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 71 - 
75)
<https://reviews.apache.org/r/48243/#comment201795>

    Same comment as for the async run loop.


- Chris Pettitt


On June 4, 2016, 1:18 a.m., Xinyu Liu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> -----------------------------------------------------------
> 
> (Updated June 4, 2016, 1:18 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> -------
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>

Reply via email to