----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48356/#review139626 -----------------------------------------------------------
A few more thoughts below. Still not a fan of the direction we're going with the config. I know it is status quo, but it further locks us into a limited model. One other benefit of the Offspring way of doing config that occurred to me while reading this is that with Offspring you get all config violations in one shot versus once per run (e.g. Samza fails fast on first config problem). The latter is how LiSpring worked and we intentionally addressed that as a part of Offspring. samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (lines 125 - 126) <https://reviews.apache.org/r/48356/#comment204894> Don't we need to stop the container directly here? shutdown will stop the executor from accepting any new work, but will not stop running work. In any case, wouldn't a clean shutdown here be better (e.g. for flushing state) then trying to force shutdown via the executor? samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 145) <https://reviews.apache.org/r/48356/#comment204893> Do we need to ensure the previous container is stopped before starting the new container? For example, would it be possible for the new container and the old container to stomp on eachother's local state if they're running at the same time? container.stop appears to be asynchronous and doesn't appear to give you any guarantee about when the container is actually stopped. --- Is the JobModelUpdateHandler called from the same thread that StreamProcessor.start is? If not (and given this is a callback its not a good assumption) you should make container volatile. samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java (line 44) <https://reviews.apache.org/r/48356/#comment204898> If this is write-once I would move this to the constructor and make it final. Otherwise does this need to be volatile? Its hard to tell as it is not clear how it is used. It might be worth noting in the class docs that this class is not thread safe. samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java (lines 60 - 61) <https://reviews.apache.org/r/48356/#comment204896> I would suggest using a block here versus a single statement. It is easy to break, e.g.: ``` if (systemFactoryClassName == null) log.error("error message"); throw new SamzaException("error message") ``` samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java (lines 92 - 94) <https://reviews.apache.org/r/48356/#comment204897> How is this used? It seems to be write only? samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala (lines 107 - 109) <https://reviews.apache.org/r/48356/#comment204899> Should this ensure that stop is complete before returning? Alternatively if we want to allow stop to be async, should we provide a way to wait for it? samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java (line 50) <https://reviews.apache.org/r/48356/#comment204900> To future proof this a bit you could use AllSspToSingleTaskGrouperFactory.class.getName. Same for the one below. - Chris Pettitt On June 23, 2016, 1:14 a.m., Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48356/ > ----------------------------------------------------------- > > (Updated June 23, 2016, 1:14 a.m.) > > > Review request for samza, Chris Pettitt and Yi Pan (Data Infrastructure). > > > Repository: samza > > > Description > ------- > > Added ConfigBuilder and support classes > > Added JobCoordinator interfaces > > > Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer > interface > > > Added TestStreamProcessor and some unit tests for ConfigBuilders > > > Changing who defined processorId > > > Fixed checkstyle errors > > > Replaced SamzaException with ConfigException > > > Removing localityManager instantiation from Samza Container > > > Diffs > ----- > > build.gradle ba4a9d14fe24e1ff170873920cd5eeef656955af > checkstyle/import-control.xml 325c38131047836dc8aedaea4187598ef3ba7666 > > samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 49b08f6b68dbb44757dcc8ce8d60c365a9d22981 > samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala > 08a4debb06f9925ae741049abb2ee0df97b2243b > samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala > cf05c15c836ddfa54ba8fe27abc18ed88ac5fc11 > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 18c09224bbae959342daf9b2b7a7d971cc224f48 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > 0aee4ced23ba730cca628fd1a59831007d348f56 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 56881d46be9f859999adabbbda20433b208e012e > > samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java > PRE-CREATION > samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java > PRE-CREATION > > samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java > 8f2dc4853a2b5dd712f25a2d2d16402bcba89d7a > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java > bc95f31c0dcaaa68d483a6f152b61aba6c543fff > > Diff: https://reviews.apache.org/r/48356/diff/ > > > Testing > ------- > > ./gradlew clean build > > Local integration test: > ./bin/grid start zookeeper > ./bin/grid start kafka > Then, run TestStreamProcessor.java > > > Thanks, > > Navina Ramesh > >