> On July 18, 2016, 8:01 p.m., Chris Pettitt wrote: > > The description says this is the initial draft implementation, but the > > title says initial draft for the APIs. I take it the latter is more > > accurate? > > > > Some initial high-level thoughts: > > > > 1. We should use Java 8 constructs where possible (if we've moved to Java > > 8). For example, Function. If not, we should probably have some Function > > equivalent and various type specifications for map, flatMap, etc. > > 2. The stream operators don't seem to **do** anything - they primarily > > appear to hold metadata about something to be done. Are they intended only > > to be declarative? Are they also not intended to know directly how to build > > themselves, but rather you would run some graph processor (with knowledge > > about each operator type) to build out a real processor? This is pretty > > fundamental to the design. If there is a doc that covers this let me know; > > otherwise it would be super helpful to be on the same page about the end > > goal. > > 3. If bullet 2 is correct, how would you navigate through the graph? It > > would seem that you would need some way to navigate `(source stream, > > operator)` tuples? > > 4. Related to bullet 3, why do the operators know about their output > > streams? Abstractly aren't they totally independent in the sense that I > > could apply the same operator to multiple input streams to produce a > > corresponding number of output streams? > > 5. It looks like you can compose graphs in two ways: directly using the > > operators or using the interfaces on DataStream. I would choose one or the > > other and use appropriate hiding mechanisms to expose just the API the user > > should be concerned with. If you go with the latter I would extract > > interfaces for the operators (if that is even necessary) into a public > > package and hide everything else in a package private namespace. > > 6. I suspect you want to limit the ability to create custom operators (at > > least if the assumption about how graph walking would work in bullet 2 > > holds), so StreamOperator's constructor probably needs to be package > > private. > > > > I suspect some over arching docs or a bluejeans session would be very > > helpful in allowing me to dig deeper into this. > > Yi Pan (Data Infrastructure) wrote: > Thanks for the quick review comments! I am working on an design doc right > now and will share it w/ the team soon. I will take your feedbacks in the > design doc. We can have the discussion in our coming meeting soon. > > Just some answers to your bullet points: > 1. Sure. I will change it to use JDK constructs. > 2. Yes. My overall thoughts on the design is: a) stream operators are > APIs that allows users to describe the functions to be applied on an input > DataStream and output to a DataStream; b) DataStream is the construct to > compose the DAG for operators; c) there will be implementation classes for > each stream operator that actually applies the user-defined processing > functions to the input tuples (in SAMZA-915 RB rb47994). These implementation > classes know how to build themselves from a descriptive stream operator, > which just contains the metadata of the operator. I am not sure what you > refer to as "graph processor", but I think that the closest thing to it in > the current RBs would be the Pipeline class, which connects all > implementation classes of stream operators when Pipeline.create() is invoked. > This definitely needs a design doc and I am working on it as suggest by > Kartik as well. Thanks for urging me on this and please let me know if you > have better suggestions to simplify the design. > 3. Yes. I have used reactive stream library to create the base class for > all oprator implementations, which I used in rb47994 to navigate through DAGs. > 4. The reason that I associate one output DataStream to each operator > instance is the following. Although the descriptive Function in each > StreamOperator are independent of input/output DataStreams, each instance of > StreamOperator represent an actual transformation stage from one input > DataStream to one output DataStream. The special case is for Joiner, which I > plan to implement as a meta-class to hold the overall JoinCondition and > JoinFunction, but internally, implement two PartialJoinOperators that takes > one input, join w/ buffered tuples from the other, and send to the common > output R. Hence, when we translate the whole DAG in operator implementation > classes, each operator instance has only one input and one output. We have > discussed the "split" operator before and have concluded that it can be > implemented by applying different filters to the common output from an > operator. W/ this, I figure that we can use the operator only implementing > 1-input-1-output to construct arbitra ry m-input-n-output DAG nodes. Please let me know if you see better options. > 5. I think w/ the current design, I am leaning toward using the > DataStream to compose the DAG. Hence, that's why the stream operators are > just metadata describing the transformation function in each stage, and is > embedded in the DataStream.xxx() functions. I assume that you suggest to hide > the stream operator APIs from the user? One question I had is: how do we > allow users to describe custom window/join/scan functions? Maybe instead of > calling the stream operator descriptive classes as "StreamOperator", they > should rather just be "function descriptions". Hence, it is clear that the > user-facing APIs to construct DAG is just DataStream and "functions" that > performs DataStream transformation? > 6. Yes. Thanks for the suggestion! I will change that. > > Please let me know if you have suggestions/comments to the above points > s.t. I can accommodate them in the design doc and the next update for this RB.
Another thought on 5, I think that I understand your suggestion now. If we remove the output DataStream from the StreamOperator (I may choose to change the name later to reflect that it actually is just a description on metadata function) and move it to DataStream altogether (i.e. one DataStream may have the downstream DataStreams in format of a collection of tuples <function-description, output-datastream> in the DataStream object itself), we can completely remove the stream operator from user interface and just use <function-description, output-datastream> to construct the actual operator that implements the DAG execution. If that is what you meant, I will go ahead to implement it. Thanks! - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47835/#review142421 ----------------------------------------------------------- On July 19, 2016, 6:04 p.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/47835/ > ----------------------------------------------------------- > > (Updated July 19, 2016, 6:04 p.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake > Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu. > > > Bugs: SAMZA-914 > https://issues.apache.org/jira/browse/SAMZA-914 > > > Repository: samza > > > Description > ------- > > SAMZA-914: initial draft of operator programming API. > > > Diffs > ----- > > build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed > gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb > samza-operator/src/main/java/org/apache/samza/operators/api/FlatMapper.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/Mapper.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/Scanner.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/api/Sink.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/data/DataStream.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/join/Joiner.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/SessionWindow.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/window/Window.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/operators/impl/Pipeline.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java > PRE-CREATION > samza-operator/src/main/java/org/apache/samza/task/DataStreamTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/TestScanner.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/data/TestDataStream.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/DataStreamJoinTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/DataStreamSplitTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/DataStreamUserTask.java > PRE-CREATION > samza-operator/src/test/java/org/apache/samza/task/TestDataStreamTask.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java > PRE-CREATION > samza-sql-core/README.md PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > PRE-CREATION > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java > PRE-CREATION > settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 > > Diff: https://reviews.apache.org/r/47835/diff/ > > > Testing > ------- > > ./gradlew clean build > > > Thanks, > > Yi Pan (Data Infrastructure) > >