> On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > lines 152-154 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line152> > > > > It's not obvious why a sink function would produce output. Even a tee > > function should be pass-through. Should our return type be void here?
Make sense. The original idea is to implement a Tee-like operator and make the output to be used optionally. It does create confusion in semantics. Let me clean it up. > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > line 161 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line161> > > > > If we're going to pass through the data this might better be called a > > TeeOperator. I'd expect a sink to produce no output. Make sense to me. Thanks! > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > line 197 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line197> > > > > It is not clear from the documentation if you're allowed to produce no > > new state for a given incoming message. Given the way the interface is > > defined, I could see doing this in a slightly inconsistent way: for create > > you pass back a null SS, but for update you return back the input value > > (identity function). I wonder if we couldn't just have an update function > > that could take a null input. A potential side benefit would be that you > > could now delete state for a key (if you needed to) by producing a null > > output from update. Thanks for point it out. I debated myself on whether we want two functions or a combined update function as you suggest. The reason I left the two functions here is that I am not sure whether the state store programming APIs that we expose to the user would need to differentiate those two cases. But it makes sense to converge to a single function even the state store programming APIs have multiple distinct functions to perform insert/update/delete in the state store. BTW, the state store programming APIs are intentionally left out in this patch and need to be worked on later. > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > line 197 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line197> > > > > It is not clear from the documentation if you're allowed to produce no > > new state for a given incoming message. Given the way the interface is > > defined, I could see doing this in a slightly inconsistent way: for create > > you pass back a null SS, but for update you return back the input value > > (identity function). I wonder if we couldn't just have an update function > > that could take a null input. A potential side benefit would be that you > > could now delete state for a key (if you needed to) by producing a null > > output from update. Dup as the above. Please see my replies there. > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > lines 212-217 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line212> > > > > Entry<SK, SS> feels a little odd here. The reason is that we already > > know how to map Message<K, M> to SK. What would happen, for example, if > > Message<K0, M0> mapped to SK0 and then a call to stateCreator with > > Message<K0, M0> produced Entry<SK1, SS0>? Instead, it seems that we should > > just take SS in and produce SS out. You could maybe argue that SK should > > come in on update, e.g. if it was an expensive function, but I suspect you > > don't want the function to be expensive anyway as this is going to be > > invoked for each message. Good point. I will make the change s.t. it will be more consistent regarding to which key in the state store a certain input message would be applied to. > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > line 246 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line246> > > > > It would probably be clearer to say "generate an output w/ output type > > {@code Message<K2, M2>}. It's not immediately obvious what type R is here. > > I would also touch on the fact that (IIUC) the output is going to be a > > Collection<Message<K2, M2>> but that this is going to be flattened down to > > a regular stream of Message<K2, M2>. Good point. Done. > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > line 300 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line300> > > > > Would it be possible that users would want to specify the store name > > themselves (e.g. for simplifying configuration and for being able to > > inspect state)? Yes. I had it in mind but did not put into this patch, since this would require scoping out the state store programming APIs to the user. Currently, the setter interfaces for the state store APIs are intentionally not exposed to the programmers. I am planning to restrict the scope now and address it when working on the state store programming APIs. > On Aug. 26, 2016, 4:01 p.m., Chris Pettitt wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java, > > line 318 > > <https://reviews.apache.org/r/47835/diff/12/?file=1481728#file1481728line318> > > > > I'm struggling a bit to imagine how this works as we've got the graph > > evaluation code elsewhere (I'm assuming). Does this suggest that for each > > incoming message we consume all messages from the other stream? Wouldn't > > this be O(n * m) in that case? Could it be we might want to consolidate one > > stream down to an aggregate state for the join? Also, what happens if we > > get new messages from the Stream<Message<K, M2>> stream? Do we need to > > buffer the other stream to do joins in that case? Not necessarily consuming all the incoming messages from the other stream. So, the Join API in this version only focus on a unique key based join, in which the key is unique in both streams. However, the PartialJoinOperator is an internal implementation class and I am hoping to keep it generic to handle cases where the join key may not be unique in the buffered stream. The way I imagined to work is: a) based on the input message, the join key is derived out and we use the join key as a selector to get the buffered messages from the other stream. The selector can be a unique key or a range query based on the join key; b) the transformation function is applied between the input message and the set of buffered messages via the selector, and output is a collection of merged messages. Since this BiFunction construct is actually not seen/used by the users, the user will only write 1:1 join merge function and the implementation will perform a single key lookup, w/o the penalty of n*m cross- product. Hence, this Stream view of the join stream is not exposed to the user for now and let us to be more flexible in extension later. As for the new arrival of messages from MessageStream<K, M2>, it will do the reverse lookup into the buffered store for MessageStream<K, M1> to join. Hence, there is a selfStoreFunctions in the PartialJoinOperator to buffer the Message<K, M1> into a different store as well, for the reverse lookup. So, to summarize, yes, we will need buffered store for each input stream in the join. For one particular PartialJoinOperator in MessageStream<K, M1>, it will use the buffered store for MessageStream<K, M2> as the joinStore to lookup, and save its own messages into the buffered store for MessageStream<K, M1> as selfStore for the other PartialJoinOperator to lookup. Hope this make sense. Again, the state store programming APIs for the join stores are intentionally left out and hidden from the end user now. I plan to work on it later. - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47835/#review146969 ----------------------------------------------------------- On Aug. 26, 2016, 8:43 p.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/47835/ > ----------------------------------------------------------- > > (Updated Aug. 26, 2016, 8:43 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake > Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu. > > > Bugs: SAMZA-914 > https://issues.apache.org/jira/browse/SAMZA-914 > > > Repository: samza > > > Description > ------- > > SAMZA-914: initial draft of operator programming API. Design doc attached to > SAMZA-914: > https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf > > > Diffs > ----- > > build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed > gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb > samza-api/src/test/java/org/apache/samza/config/TestConfig.java > 5d066c5867e9df9e94e60bde825dedf10703b399 > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java > PRE-CREATION > samza-sql-core/README.md PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > PRE-CREATION > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java > PRE-CREATION > settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 > > Diff: https://reviews.apache.org/r/47835/diff/ > > > Testing > ------- > > ./gradlew clean build > > > Thanks, > > Yi Pan (Data Infrastructure) > >