-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/48356/#review139626
-----------------------------------------------------------



A few more thoughts below.

Still not a fan of the direction we're going with the config. I know it is 
status quo, but it further locks us into a limited model. One other benefit of 
the Offspring way of doing config that occurred to me while reading this is 
that with Offspring you get all config violations in one shot versus once per 
run (e.g. Samza fails fast on first config problem). The latter is how LiSpring 
worked and we intentionally addressed that as a part of Offspring.


samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (lines 
125 - 126)
<https://reviews.apache.org/r/48356/#comment204894>

    Don't we need to stop the container directly here? shutdown will stop the 
executor from accepting any new work, but will not stop running work. In any 
case, wouldn't a clean shutdown here be better (e.g. for flushing state) then 
trying to force shutdown via the executor?



samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 
145)
<https://reviews.apache.org/r/48356/#comment204893>

    Do we need to ensure the previous container is stopped before starting the 
new container? For example, would it be possible for the new container and the 
old container to stomp on eachother's local state if they're running at the 
same time? container.stop appears to be asynchronous and doesn't appear to give 
you any guarantee about when the container is actually stopped.
    
    ---
    
    Is the JobModelUpdateHandler called from the same thread that 
StreamProcessor.start is? If not (and given this is a callback its not a good 
assumption) you should make container volatile.



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (line 44)
<https://reviews.apache.org/r/48356/#comment204898>

    If this is write-once I would move this to the constructor and make it 
final. Otherwise does this need to be volatile? Its hard to tell as it is not 
clear how it is used. It might be worth noting in the class docs that this 
class is not thread safe.



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (lines 60 - 61)
<https://reviews.apache.org/r/48356/#comment204896>

    I would suggest using a block here versus a single statement. It is easy to 
break, e.g.:
    
    ```
        if (systemFactoryClassName == null)
            log.error("error message");
            throw new SamzaException("error message")
    ```



samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
 (lines 92 - 94)
<https://reviews.apache.org/r/48356/#comment204897>

    How is this used? It seems to be write only?



samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 107 - 
109)
<https://reviews.apache.org/r/48356/#comment204899>

    Should this ensure that stop is complete before returning? Alternatively if 
we want to allow stop to be async, should we provide a way to wait for it?



samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
 (line 50)
<https://reviews.apache.org/r/48356/#comment204900>

    To future proof this a bit you could use 
AllSspToSingleTaskGrouperFactory.class.getName. Same for the one below.


- Chris Pettitt


On June 23, 2016, 1:14 a.m., Navina Ramesh wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/48356/
> -----------------------------------------------------------
> 
> (Updated June 23, 2016, 1:14 a.m.)
> 
> 
> Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure).
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Added ConfigBuilder and support classes
> 
> Added JobCoordinator interfaces
> 
> 
> Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer 
> interface
> 
> 
> Added TestStreamProcessor and some unit tests for ConfigBuilders
> 
> 
> Changing who defined processorId
> 
> 
> Fixed checkstyle errors
> 
> 
> Replaced SamzaException with ConfigException
> 
> 
> Removing localityManager instantiation from Samza Container
> 
> 
> Diffs
> -----
> 
>   build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af 
>   checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java 
> PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java
>  PRE-CREATION 
>   samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java 
> PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java
>  PRE-CREATION 
>   
> samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java
>  PRE-CREATION 
>   samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala 
> 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 
>   samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
> 08a4debb06f9925ae741049abb2ee0df97b2243b 
>   samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala 
> cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 
>   samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
> 18c09224bbae959342daf9b2b7a7d971cc224f48 
>   samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala 
> 0aee4ced23ba730cca628fd1a59831007d348f56 
>   samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala 
> 56881d46be9f859999adabbbda20433b208e012e 
>   
> samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java
>  PRE-CREATION 
>   samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java 
> PRE-CREATION 
>   
> samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java 
> PRE-CREATION 
>   samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java 
> 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a 
>   samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java 
> bc95f31c0dcaaa68d483a6f152b61aba6c543fff 
> 
> Diff: https://reviews.apache.org/r/48356/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> Local integration test:
> ./bin/grid start zookeeper
> ./bin/grid start kafka
> Then, run TestStreamProcessor.java
> 
> 
> Thanks,
> 
> Navina Ramesh
> 
>

Reply via email to