> On July 18, 2016, 8:01 p.m., Chris Pettitt wrote:
> > The description says this is the initial draft implementation, but the 
> > title says initial draft for the APIs. I take it the latter is more 
> > accurate?
> > 
> > Some initial high-level thoughts:
> > 
> > 1. We should use Java 8 constructs where possible (if we've moved to Java 
> > 8). For example, Function. If not, we should probably have some Function 
> > equivalent and various type specifications for map, flatMap, etc.
> > 2. The stream operators don't seem to **do** anything - they primarily 
> > appear to hold metadata about something to be done. Are they intended only 
> > to be declarative? Are they also not intended to know directly how to build 
> > themselves, but rather you would run some graph processor (with knowledge 
> > about each operator type) to build out a real processor? This is pretty 
> > fundamental to the design. If there is a doc that covers this let me know; 
> > otherwise it would be super helpful to be on the same page about the end 
> > goal.
> > 3. If bullet 2 is correct, how would you navigate through the graph? It 
> > would seem that you would need some way to navigate `(source stream, 
> > operator)` tuples?
> > 4. Related to bullet 3, why do the operators know about their output 
> > streams? Abstractly aren't they totally independent in the sense that I 
> > could apply the same operator to multiple input streams to produce a 
> > corresponding number of output streams?
> > 5. It looks like you can compose graphs in two ways: directly using the 
> > operators or using the interfaces on DataStream. I would choose one or the 
> > other and use appropriate hiding mechanisms to expose just the API the user 
> > should be concerned with. If you go with the latter I would extract 
> > interfaces for the operators (if that is even necessary) into a public 
> > package and hide everything else in a package private namespace.
> > 6. I suspect you want to limit the ability to create custom operators (at 
> > least if the assumption about how graph walking would work in bullet 2 
> > holds), so StreamOperator's constructor probably needs to be package 
> > private.
> > 
> > I suspect some over arching docs or a bluejeans session would be very 
> > helpful in allowing me to dig deeper into this.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Thanks for the quick review comments! I am working on an design doc right 
> now and will share it w/ the team soon. I will take your feedbacks in the 
> design doc. We can have the discussion in our coming meeting soon.
>     
>     Just some answers to your bullet points:
>     1. Sure. I will change it to use JDK constructs.
>     2. Yes. My overall thoughts on the design is: a) stream operators are 
> APIs that allows users to describe the functions to be applied on an input 
> DataStream and output to a DataStream; b) DataStream is the construct to 
> compose the DAG for operators; c) there will be implementation classes for 
> each stream operator that actually applies the user-defined processing 
> functions to the input tuples (in SAMZA-915 RB rb47994). These implementation 
> classes know how to build themselves from a descriptive stream operator, 
> which just contains the metadata of the operator. I am not sure what you 
> refer to as "graph processor", but I think that the closest thing to it in 
> the current RBs would be the Pipeline class, which connects all 
> implementation classes of stream operators when Pipeline.create() is invoked. 
> This definitely needs a design doc and I am working on it as suggest by 
> Kartik as well. Thanks for urging me on this and please let me know if you 
> have better suggestions to simplify the 
 design.
>     3. Yes. I have used reactive stream library to create the base class for 
> all oprator implementations, which I used in rb47994 to navigate through DAGs.
>     4. The reason that I associate one output DataStream to each operator 
> instance is the following. Although the descriptive Function in each 
> StreamOperator are independent of input/output DataStreams, each instance of 
> StreamOperator represent an actual transformation stage from one input 
> DataStream to one output DataStream. The special case is for Joiner, which I 
> plan to implement as a meta-class to hold the overall JoinCondition and 
> JoinFunction, but internally, implement two PartialJoinOperators that takes 
> one input, join w/ buffered tuples from the other, and send to the common 
> output R. Hence, when we translate the whole DAG in operator implementation 
> classes, each operator instance has only one input and one output. We have 
> discussed the "split" operator before and have concluded that it can be 
> implemented by applying different filters to the common output from an 
> operator. W/ this, I figure that we can use the operator only implementing 
> 1-input-1-output to construct arbitra
 ry m-input-n-output DAG nodes. Please let me know if you see better options.
>     5. I think w/ the current design, I am leaning toward using the 
> DataStream to compose the DAG. Hence, that's why the stream operators are 
> just metadata describing the transformation function in each stage, and is 
> embedded in the DataStream.xxx() functions. I assume that you suggest to hide 
> the stream operator APIs from the user? One question I had is: how do we 
> allow users to describe custom window/join/scan functions? Maybe instead of 
> calling the stream operator descriptive classes as "StreamOperator", they 
> should rather just be "function descriptions". Hence, it is clear that the 
> user-facing APIs to construct DAG is just DataStream and "functions" that 
> performs DataStream transformation?
>     6. Yes. Thanks for the suggestion! I will change that.
>     
>     Please let me know if you have suggestions/comments to the above points 
> s.t. I can accommodate them in the design doc and the next update for this RB.

Another thought on 5, I think that I understand your suggestion now. If we 
remove the output DataStream from the StreamOperator (I may choose to change 
the name later to reflect that it actually is just a description on metadata 
function) and move it to DataStream altogether (i.e. one DataStream may have 
the downstream DataStreams in format of a collection of tuples 
<function-description, output-datastream> in the DataStream object itself), we 
can completely remove the stream operator from user interface and just use 
<function-description, output-datastream> to construct the actual operator that 
implements the DAG execution. If that is what you meant, I will go ahead to 
implement it. Thanks!


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review142421
-----------------------------------------------------------


On July 19, 2016, 6:04 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated July 19, 2016, 6:04 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API.
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-operator/src/main/java/org/apache/samza/operators/api/FlatMapper.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Mapper.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Scanner.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Sink.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/DataStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/join/Joiner.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/DataStreamTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/DataStreamJoinTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/DataStreamSplitTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/DataStreamUserTask.java 
> PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/TestDataStreamTask.java 
> PRE-CREATION 
>   
> samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
>  PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to