> On July 18, 2016, 8:01 p.m., Chris Pettitt wrote:
> > The description says this is the initial draft implementation, but the 
> > title says initial draft for the APIs. I take it the latter is more 
> > accurate?
> > 
> > Some initial high-level thoughts:
> > 
> > 1. We should use Java 8 constructs where possible (if we've moved to Java 
> > 8). For example, Function. If not, we should probably have some Function 
> > equivalent and various type specifications for map, flatMap, etc.
> > 2. The stream operators don't seem to **do** anything - they primarily 
> > appear to hold metadata about something to be done. Are they intended only 
> > to be declarative? Are they also not intended to know directly how to build 
> > themselves, but rather you would run some graph processor (with knowledge 
> > about each operator type) to build out a real processor? This is pretty 
> > fundamental to the design. If there is a doc that covers this let me know; 
> > otherwise it would be super helpful to be on the same page about the end 
> > goal.
> > 3. If bullet 2 is correct, how would you navigate through the graph? It 
> > would seem that you would need some way to navigate `(source stream, 
> > operator)` tuples?
> > 4. Related to bullet 3, why do the operators know about their output 
> > streams? Abstractly aren't they totally independent in the sense that I 
> > could apply the same operator to multiple input streams to produce a 
> > corresponding number of output streams?
> > 5. It looks like you can compose graphs in two ways: directly using the 
> > operators or using the interfaces on DataStream. I would choose one or the 
> > other and use appropriate hiding mechanisms to expose just the API the user 
> > should be concerned with. If you go with the latter I would extract 
> > interfaces for the operators (if that is even necessary) into a public 
> > package and hide everything else in a package private namespace.
> > 6. I suspect you want to limit the ability to create custom operators (at 
> > least if the assumption about how graph walking would work in bullet 2 
> > holds), so StreamOperator's constructor probably needs to be package 
> > private.
> > 
> > I suspect some over arching docs or a bluejeans session would be very 
> > helpful in allowing me to dig deeper into this.

Thanks for the quick review comments! I am working on an design doc right now 
and will share it w/ the team soon. I will take your feedbacks in the design 
doc. We can have the discussion in our coming meeting soon.

Just some answers to your bullet points:
1. Sure. I will change it to use JDK constructs.
2. Yes. My overall thoughts on the design is: a) stream operators are APIs that 
allows users to describe the functions to be applied on an input DataStream and 
output to a DataStream; b) DataStream is the construct to compose the DAG for 
operators; c) there will be implementation classes for each stream operator 
that actually applies the user-defined processing functions to the input tuples 
(in SAMZA-915 RB rb47994). These implementation classes know how to build 
themselves from a descriptive stream operator, which just contains the metadata 
of the operator. I am not sure what you refer to as "graph processor", but I 
think that the closest thing to it in the current RBs would be the Pipeline 
class, which connects all implementation classes of stream operators when 
Pipeline.create() is invoked. This definitely needs a design doc and I am 
working on it as suggest by Kartik as well. Thanks for urging me on this and 
please let me know if you have better suggestions to simplify the design
 .
3. Yes. I have used reactive stream library to create the base class for all 
oprator implementations, which I used in rb47994 to navigate through DAGs.
4. The reason that I associate one output DataStream to each operator instance 
is the following. Although the descriptive Function in each StreamOperator are 
independent of input/output DataStreams, each instance of StreamOperator 
represent an actual transformation stage from one input DataStream to one 
output DataStream. The special case is for Joiner, which I plan to implement as 
a meta-class to hold the overall JoinCondition and JoinFunction, but 
internally, implement two PartialJoinOperators that takes one input, join w/ 
buffered tuples from the other, and send to the common output R. Hence, when we 
translate the whole DAG in operator implementation classes, each operator 
instance has only one input and one output. We have discussed the "split" 
operator before and have concluded that it can be implemented by applying 
different filters to the common output from an operator. W/ this, I figure that 
we can use the operator only implementing 1-input-1-output to construct 
arbitrary m-i
 nput-n-output DAG nodes. Please let me know if you see better options.
5. I think w/ the current design, I am leaning toward using the DataStream to 
compose the DAG. Hence, that's why the stream operators are just metadata 
describing the transformation function in each stage, and is embedded in the 
DataStream.xxx() functions. I assume that you suggest to hide the stream 
operator APIs from the user? One question I had is: how do we allow users to 
describe custom window/join/scan functions? Maybe instead of calling the stream 
operator descriptive classes as "StreamOperator", they should rather just be 
"function descriptions". Hence, it is clear that the user-facing APIs to 
construct DAG is just DataStream and "functions" that performs DataStream 
transformation?
6. Yes. Thanks for the suggestion! I will change that.

Please let me know if you have suggestions/comments to the above points s.t. I 
can accommodate them in the design doc and the next update for this RB.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review142421
-----------------------------------------------------------


On July 19, 2016, 6:04 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated July 19, 2016, 6:04 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API.
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-operator/src/main/java/org/apache/samza/operators/api/FlatMapper.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Mapper.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scanner.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Sink.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/DataStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/join/Joiner.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/DataStreamTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/DataStreamJoinTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/DataStreamSplitTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/DataStreamUserTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/TestDataStreamTask.java 
> PRE-CREATION 
>   
> samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
>  PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to