----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/48356/#review136871 -----------------------------------------------------------
Thoughts below. A more general thought: the builders don't seem to be providing a lot of value and they're constraining you to a bad API (string based, untyped). I'd strongly suggest that we use typed builders and map this back to strings where we must. For example, to take an ExecutorService, I'd much rather pass in an ExecutorService and not make an ExecutorServiceFactory interface and then pass in the string name of a concrete implementation. The former is much easier for test, because is direct, and customization is super simple - you just configure the ExecutorService the way you like. samza-core/src/main/java/org/apache/samza/configbuilder/BuilderInterface.java (line 23) <https://reviews.apache.org/r/48356/#comment201952> This doesn't seem like a particularly useful interface as it is untyped. Anywhere you could take this interface it would be possible to pass in the wrong type of builder. samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (lines 33 - 39) <https://reviews.apache.org/r/48356/#comment201953> Do these need to public? I checked JOB_NAME only, but it looked like it could be private. samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (lines 42 - 53) <https://reviews.apache.org/r/48356/#comment201954> Can these be private vs. package private? samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 80) <https://reviews.apache.org/r/48356/#comment201955> This is where setX and addX would make the API semantics clearer. samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java (line 123) <https://reviews.apache.org/r/48356/#comment201956> FWIW, when I have mandatory arguments to a builder I typically either take them in the constructor to the builder or take them as arguments to the build call. That works well when you have typed arguments - it becomes obvious at compile time that you need these values - but would not work well here where everything is a string. This defers checking to runtime. samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java (line 45) <https://reviews.apache.org/r/48356/#comment201957> More directly: `return Collections.singleMap(taskName, ssps)`. samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java (lines 45 - 47) <https://reviews.apache.org/r/48356/#comment201958> More compactly: `return Collections.singleton(containerModel)` samza-core/src/main/java/org/apache/samza/coordinator/AbstractJobCoordinator.java (line 23) <https://reviews.apache.org/r/48356/#comment201972> I'd probably ditch this. Over the years I've found that abstract base classes make things difficult to follow and also sometimes unnecessarily constrain you. They generally don't provide value that outweighs the cost. In this case the code here appears to be only used by one concrete class, so I'd definitely just inline it in one place, which makes following the code easier. When we get to the point we need more than one of these we can determine the best mechanism to share code (e.g. composition, external helper code) as appropriate. samza-core/src/main/java/org/apache/samza/coordinator/AbstractJobCoordinator.java (lines 24 - 25) <https://reviews.apache.org/r/48356/#comment201960> I would strongly suggest making these private or at most protected. Allowing direct access to fields constrains your ability to change implementation details down the road. I'd also suggest taking these at construction time and making them final if possible, which also simplifies how they can be interacted with. --- Does this class need to be thread-safe? If not, I'd document that it is not. If it does, then you really do need to either go the "final" route or use volatile or some other mechanism to ensure changes are visible. samza-core/src/main/java/org/apache/samza/coordinator/AbstractJobCoordinator.java (lines 52 - 54) <https://reviews.apache.org/r/48356/#comment201961> If you do end up not taking this at the constructor, I would suggest changing the name of this method to "setJobModelUpdateHandler". The current name suggests to me that you can register more than one handler, while set is unambiguous. samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 39) <https://reviews.apache.org/r/48356/#comment201969> This should be final. I would construct the container at construction time. If the intention is to allow multiple restarts with new containers then we need to have code to handle invalid states (e.g. stop called before start, etc.). samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 41) <https://reviews.apache.org/r/48356/#comment201968> This is not what I was getting at. What I was intending was that your factory for this would take an optional executor that would be passed here at construction time. In its current state it is not overridable without subclassing. samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 42) <https://reviews.apache.org/r/48356/#comment201971> I'd make this final. samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 47) <https://reviews.apache.org/r/48356/#comment201967> What is the purpose of this class? It seems to be trying to manage orchestration for a few different services? If so, I'd definitely give this a name that better reflects the intent. I'd also suggest not making it also be a JobModelUpdateHandler (that could potentially be an inner class). samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 52) <https://reviews.apache.org/r/48356/#comment201964> It is not thread-safe to leak the this pointer from the constructor. samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java (line 68) <https://reviews.apache.org/r/48356/#comment201970> To reinforce some comments above, this can lead to an NPE if stop is called before start. samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala (lines 617 - 619) <https://reviews.apache.org/r/48356/#comment201973> I hope this is temporary. If not, if would be nicer to provide proper lifecycle / shutdown mechansisms. For example, on stop, I may want to flush a cache to disk. - Chris Pettitt On June 8, 2016, 9:59 p.m., Navina Ramesh wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/48356/ > ----------------------------------------------------------- > > (Updated June 8, 2016, 9:59 p.m.) > > > Review request for samza and Chris Pettitt. > > > Repository: samza > > > Description > ------- > > Added ConfigBuilder and support classes > > Added JobCoordinator interfaces > > > Adding StreamProcessor, StandaloneJobCoordinator and updating SamzaContainer > interface > > > Added TestStreamProcessor and some unit tests for ConfigBuilders > > > Changing who defined processorId > > > Fixed checkstyle errors > > > Replaced SamzaException with ConfigException > > > Removing localityManager instantiation from Samza Container > > > Diffs > ----- > > build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed > checkstyle/import-control.xml 7a09c7ed8ab372d2342f31e850ae09c605292eb2 > > samza-core/src/main/java/org/apache/samza/configbuilder/BuilderInterface.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/CheckpointConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/ConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/GenericConfigBuilder.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaCheckpointConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/KafkaSystemConfig.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SerdeConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/StandaloneConfigBuilder.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/configbuilder/SystemConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/configbuilder/SystemStreamConfig.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/stream/AllSspToSingleTaskGrouperFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouper.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/container/grouper/task/SingleContainerGrouperFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/AbstractJobCoordinator.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobCoordinatorFactory.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/coordinator/JobModelUpdateHandler.java > PRE-CREATION > samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinator.java > PRE-CREATION > > samza-core/src/main/java/org/apache/samza/standalone/StandaloneJobCoordinatorFactory.java > PRE-CREATION > samza-core/src/main/scala/org/apache/samza/config/JobConfig.scala > 4d5ca4d3d2dd1542c5d073dfe6c13666ef5f51fc > samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala > 951d479606423c39d600d69025c940af1f0f0600 > samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala > e9a51083aff4dc316e94144f6242fe702ca73a68 > samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala > 56881d46be9f859999adabbbda20433b208e012e > > samza-core/src/test/java/org/apache/samza/configbuilder/TestGenericConfigBuilder.java > PRE-CREATION > > samza-core/src/test/java/org/apache/samza/configbuilder/TestStandaloneConfigBuilder.java > PRE-CREATION > samza-test/src/test/java/org/apache/samza/processor/MyStreamTask.java > PRE-CREATION > > samza-test/src/test/java/org/apache/samza/processor/TestStreamProcessor.java > PRE-CREATION > samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java > c556d833059826d97ad6928de47330effba40219 > samza-yarn/src/main/java/org/apache/samza/job/yarn/SamzaTaskManager.java > bc95f31c0dcaaa68d483a6f152b61aba6c543fff > > Diff: https://reviews.apache.org/r/48356/diff/ > > > Testing > ------- > > ./gradlew clean build > > Local integration test: > ./bin/grid start zookeeper > ./bin/grid start kafka > Then, run TestStreamProcessor.java > > > Thanks, > > Navina Ramesh > >