-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review146969
-----------------------------------------------------------



Breaking for lunch, but don't want to risk losing my comments. Will pick up 
from "Scan".


samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 52)
<https://reviews.apache.org/r/47835/#comment213892>

    Minor: final after access modifier for consistency with the rest of the 
code.
    
    ---
    
    If this is intended to be public, I would make it top level (same for most 
other public nested classes). If you want to have a factory method for it, 
something like java's Collections (e.g. MessageStreams) is a nice way to group 
these.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 80)
<https://reviews.apache.org/r/47835/#comment213893>

    Package private top level class?
    
    ---
    
    This is very much a taste thing, but I've found myself shying away from 
abstract base classes in favor of interfaces over the years. A class like this 
doesn't provide a heck of a lot of value (it's a single field holder). It could 
be reframed as an interface (with getOutputStream). One of the nice things that 
fall out of that is that when someone reviews an Operator it is very clear 
where output stream comes from (it's initialized in the constructor and 
accessed via the getter).



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(lines 152 - 154)
<https://reviews.apache.org/r/47835/#comment213894>

    It's not obvious why a sink function would produce output. Even a tee 
function should be pass-through. Should our return type be void here?



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 161)
<https://reviews.apache.org/r/47835/#comment213895>

    If we're going to pass through the data this might better be called a 
TeeOperator. I'd expect a sink to produce no output.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 197)
<https://reviews.apache.org/r/47835/#comment213899>

    It is not clear from the documentation if you're allowed to produce no new 
state for a given incoming message. Given the way the interface is defined, I 
could see doing this in a slightly inconsistent way: for create you pass back a 
null SS, but for update you return back the input value (identity function). I 
wonder if we couldn't just have an update function that could take a null 
input. A potential side benefit would be that you could now delete state for a 
key (if you needed to) by producing a null output from update.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 197)
<https://reviews.apache.org/r/47835/#comment213900>

    It is not clear from the documentation if you're allowed to produce no new 
state for a given incoming message. Given the way the interface is defined, I 
could see doing this in a slightly inconsistent way: for create you pass back a 
null SS, but for update you return back the input value (identity function). I 
wonder if we couldn't just have an update function that could take a null 
input. A potential side benefit would be that you could now delete state for a 
key (if you needed to) by producing a null output from update.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(lines 212 - 217)
<https://reviews.apache.org/r/47835/#comment213897>

    Entry<SK, SS> feels a little odd here. The reason is that we already know 
how to map Message<K, M> to SK. What would happen, for example, if Message<K0, 
M0> mapped to SK0 and then a call to stateCreator with Message<K0, M0> produced 
Entry<SK1, SS0>? Instead, it seems that we should just take SS in and produce 
SS out. You could maybe argue that SK should come in on update, e.g. if it was 
an expensive function, but I suspect you don't want the function to be 
expensive anyway as this is going to be invoked for each message.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 246)
<https://reviews.apache.org/r/47835/#comment213902>

    It would probably be clearer to say "generate an output w/ output type 
{@code Message<K2, M2>}. It's not immediately obvious what type R is here. I 
would also touch on the fact that (IIUC) the output is going to be a 
Collection<Message<K2, M2>> but that this is going to be flattened down to a 
regular stream of Message<K2, M2>.



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 300)
<https://reviews.apache.org/r/47835/#comment213904>

    Would it be possible that users would want to specify the store name 
themselves (e.g. for simplifying configuration and for being able to inspect 
state)?



samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java 
(line 318)
<https://reviews.apache.org/r/47835/#comment213908>

    I'm struggling a bit to imagine how this works as we've got the graph 
evaluation code elsewhere (I'm assuming). Does this suggest that for each 
incoming message we consume all messages from the other stream? Wouldn't this 
be O(n * m) in that case? Could it be we might want to consolidate one stream 
down to an aggregate state for the join? Also, what happens if we get new 
messages from the Stream<Message<K, M2>> stream? Do we need to buffer the other 
stream to do joins in that case?


- Chris Pettitt


On Aug. 23, 2016, 7:23 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Aug. 23, 2016, 7:23 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to 
> SAMZA-914: 
> https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 
> 5d066c5867e9df9e94e60bde825dedf10703b399 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scan.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/window/WindowOutput.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/join/Join.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Trigger.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SinkOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
> PRE-CREATION 
>   
> samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
>  PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to