----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47835/#review146969 -----------------------------------------------------------
Breaking for lunch, but don't want to risk losing my comments. Will pick up from "Scan". samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 52) <https://reviews.apache.org/r/47835/#comment213892> Minor: final after access modifier for consistency with the rest of the code. --- If this is intended to be public, I would make it top level (same for most other public nested classes). If you want to have a factory method for it, something like java's Collections (e.g. MessageStreams) is a nice way to group these. samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 80) <https://reviews.apache.org/r/47835/#comment213893> Package private top level class? --- This is very much a taste thing, but I've found myself shying away from abstract base classes in favor of interfaces over the years. A class like this doesn't provide a heck of a lot of value (it's a single field holder). It could be reframed as an interface (with getOutputStream). One of the nice things that fall out of that is that when someone reviews an Operator it is very clear where output stream comes from (it's initialized in the constructor and accessed via the getter). samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (lines 152 - 154) <https://reviews.apache.org/r/47835/#comment213894> It's not obvious why a sink function would produce output. Even a tee function should be pass-through. Should our return type be void here? samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 161) <https://reviews.apache.org/r/47835/#comment213895> If we're going to pass through the data this might better be called a TeeOperator. I'd expect a sink to produce no output. samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 197) <https://reviews.apache.org/r/47835/#comment213899> It is not clear from the documentation if you're allowed to produce no new state for a given incoming message. Given the way the interface is defined, I could see doing this in a slightly inconsistent way: for create you pass back a null SS, but for update you return back the input value (identity function). I wonder if we couldn't just have an update function that could take a null input. A potential side benefit would be that you could now delete state for a key (if you needed to) by producing a null output from update. samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 197) <https://reviews.apache.org/r/47835/#comment213900> It is not clear from the documentation if you're allowed to produce no new state for a given incoming message. Given the way the interface is defined, I could see doing this in a slightly inconsistent way: for create you pass back a null SS, but for update you return back the input value (identity function). I wonder if we couldn't just have an update function that could take a null input. A potential side benefit would be that you could now delete state for a key (if you needed to) by producing a null output from update. samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (lines 212 - 217) <https://reviews.apache.org/r/47835/#comment213897> Entry<SK, SS> feels a little odd here. The reason is that we already know how to map Message<K, M> to SK. What would happen, for example, if Message<K0, M0> mapped to SK0 and then a call to stateCreator with Message<K0, M0> produced Entry<SK1, SS0>? Instead, it seems that we should just take SS in and produce SS out. You could maybe argue that SK should come in on update, e.g. if it was an expensive function, but I suspect you don't want the function to be expensive anyway as this is going to be invoked for each message. samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 246) <https://reviews.apache.org/r/47835/#comment213902> It would probably be clearer to say "generate an output w/ output type {@code Message<K2, M2>}. It's not immediately obvious what type R is here. I would also touch on the fact that (IIUC) the output is going to be a Collection<Message<K2, M2>> but that this is going to be flattened down to a regular stream of Message<K2, M2>. samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 300) <https://reviews.apache.org/r/47835/#comment213904> Would it be possible that users would want to specify the store name themselves (e.g. for simplifying configuration and for being able to inspect state)? samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java (line 318) <https://reviews.apache.org/r/47835/#comment213908> I'm struggling a bit to imagine how this works as we've got the graph evaluation code elsewhere (I'm assuming). Does this suggest that for each incoming message we consume all messages from the other stream? Wouldn't this be O(n * m) in that case? Could it be we might want to consolidate one stream down to an aggregate state for the join? Also, what happens if we get new messages from the Stream<Message<K, M2>> stream? Do we need to buffer the other stream to do joins in that case? - Chris Pettitt On Aug. 23, 2016, 7:23 a.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/47835/ > ----------------------------------------------------------- > > (Updated Aug. 23, 2016, 7:23 a.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake > Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu. > > > Bugs: SAMZA-914 > https://issues.apache.org/jira/browse/SAMZA-914 > > > Repository: samza > > > Description > ------- > > SAMZA-914: initial draft of operator programming API. Design doc attached to > SAMZA-914: > https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf > > > Diffs > ----- > > build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed > gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb > samza-api/src/test/java/org/apache/samza/config/TestConfig.java > 5d066c5867e9df9e94e60bde825dedf10703b399 > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java > PRE-CREATION > samza-sql-core/README.md PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > PRE-CREATION > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java > PRE-CREATION > settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 > > Diff: https://reviews.apache.org/r/47835/diff/ > > > Testing > ------- > > ./gradlew clean build > > > Thanks, > > Yi Pan (Data Infrastructure) > >