> On June 8, 2016, 8:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java, 
> > line 73
> > <https://reviews.apache.org/r/48243/diff/1/?file=1406254#file1406254line73>
> >
> >     It's been a while and I'm not sure if we discussed: is the goal to 
> > ultimately just switch over to async task for both cases (async and sync)?

Right, once the asycn task is working in prod, we will switch to use 
AsyncRunLoop only.


> On June 8, 2016, 8:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, lines 
> > 123-128
> > <https://reviews.apache.org/r/48243/diff/1/?file=1406255#file1406255line123>
> >
> >     I believe this was copied from the original run loop. It seems to me 
> > that it would be nice to have the run loop have lifecycle methods (e.g. 
> > shutdown) and have some other application setup class (the container?) that 
> > is responsible for tying shutdown of the process to shutting down the run 
> > loop. This would allow the run loop lifecycle to be decouple from the 
> > process lifecycle at no loss on generality.

I added the shutdown() method in both RunLoop and AsyncRunLoop, and now the 
shutdownhook is added in SamzaContainer. The logic looks much cleaner now :). 
Thanks.


> On June 8, 2016, 8:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, lines 
> > 437-442
> > <https://reviews.apache.org/r/48243/diff/1/?file=1406255#file1406255line437>
> >
> >     This is a bit confusing to me. I see that we're enqueueing an evelope 
> > wrapped in a pending envelope per task worker, but I don't see how we would 
> > end up dequeueing the same pending envelope twice. Could we expand on the 
> > documentation to make it clearer?
> >     
> >     Also, depending on how we end up dequeueing twice we need to ensure 
> > that the state change (mark processed) is visible. Is that the case? The 
> > state is mutable and we're not using anything to publish the state change 
> > (as far as I can see). Either we should ensure this state is visible across 
> > threads, e.g. with a volatile or CAS, or we should document why it doesn't 
> > need to be.

This is to handle an enevelope can be processed by multiple tasks, like the 
broadcast stream where one partition is consumed by all tasks. I added the java 
doc to make it clear.

Thanks for catching the state visibility problem among threads. The state is 
volatile now.


> On June 8, 2016, 8:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java, line 518
> > <https://reviews.apache.org/r/48243/diff/1/?file=1406255#file1406255line518>
> >
> >     This is the second level deep non-static inner class. At this point 
> > reasoning about the interactions between everything is getting pretty 
> > difficult. Instead, could we make the interactions a clearer by making 
> > these inner classes top-level, package private? I suspect this might 
> > involve shuffling some things around. For example, maybe 
> > pendingEnvelopeQueue really belongs in workerState and the worker inserts 
> > and removes via the state instance.
> >     
> >     Having a smaller non-static inner class, e.g. for a callback, is 
> > reasonable if it is not too complicated, but both of these, as is, are a 
> > bit too heavy.

Agree, I refactored the code so the state is renamed as AsyncTaskState and now 
both AsyncTaskWorker and AsyncTaskState are one-level inner class of 
AsyncRunLoop. The pending queue is moved into the state, which makes a lot of 
sense. Do you still think they need to be package private? This was following 
your earlier comments to make the logic in one place so it's easier to reason 
about.


> On June 8, 2016, 8:08 p.m., Chris Pettitt wrote:
> > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala, lines 
> > 73-77
> > <https://reviews.apache.org/r/48243/diff/1/?file=1406265#file1406265line73>
> >
> >     Same comment as for the async run loop.

Fixed. As commented above.


- Xinyu


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48243/#review136357
-----------------------------------------------------------


On June 9, 2016, 7:49 p.m., Xinyu Liu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48243/
> -----------------------------------------------------------
> 
> (Updated June 9, 2016, 7:49 p.m.)
> 
> 
> Review request for samza, Chris Pettitt, Navina Ramesh, and Yi Pan (Data 
> Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> This is the main part of the change, including the following:
> 
> - New API for AsyncStreamTask and callback.
> - Multithread scheduling in AsyncRunLoop
> - Callback management for asyn tasks
> 
> 
> Diffs
> -----
> 
>   checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 
>   samza-api/src/main/java/org/apache/samza/task/AsyncStreamTask.java 
> PRE-CREATION 
>   samza-api/src/main/java/org/apache/samza/task/TaskCallback.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/AsyncStreamTaskAdapter.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/CoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackFactory.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackListener.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/util/Utils.java PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala 
> 00648e49f8c7a9bbf5634e18ba0f95feb244613e 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> 3f25eca6e3dffc57360e8bd8c435177c2a9a910a 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> cf3c4c0ab08a59760bc899c6f2027755e933b350 
>   
> samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
>  9e6641c3628290dc05e1eb5537e86bff9d37f92c 
>   samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala 
> d32a92976e43ca24033b48c91851ee706de7de6b 
>   
> samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
>  8b863887cf584d2d7a9b18181c7b0cd1e9dfec00 
>   samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala 
> 2efe836fc3b622cbe89e2042a37407f3cf732f58 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestAsyncStreamAdapter.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestCoordinatorRequests.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackImpl.java 
> PRE-CREATION 
>   samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java 
> PRE-CREATION 
>   samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala 
> e280daa9626757cb4d17c0c03eed923277230c3e 
>   
> samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
> 1358fdd8a386f5f81128ef871c72833d8cf11d86 
>   samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala 
> 5457f0e05ae4d615b9c86f48a662c54b13828e78 
>   samza-core/src/test/scala/org/apache/samza/system/TestSystemConsumers.scala 
> 09da62e0f9a10f7c3683345a309c6278ff01fb4b 
> 
> Diff: https://reviews.apache.org/r/48243/diff/
> 
> 
> Testing
> -------
> 
> unit tests and local testing.
> 
> 
> Thanks,
> 
> Xinyu Liu
> 
>

Reply via email to