> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote: > > Hi, Milinda, sorry for the late review. I have put down my comments below. > > Overall, there are two things to be discussed: > > 1) Adding OperatorBuilder interface as well. It serves two purposes: > > a) I remember that we have discussed the need for this due to the fact > > that in the parsing/planning phase, there are cases where the required > > parameters for the operator are not generated / finalized yet (hence you > > have added some setter functions in OperatorSpec as workaround). W/ > > OperatorBuilder, it is much easier that we just keep setting the parameters > > w/o calling build() > > b) In the user code directly using operator layer API, using > > OperatorBuilder can help to make the TopologyBuilder code more intuitive > > and helps to hide away all unnecessary specs s.t. intermediate stream/table > > names and/or operator names > > 2) The implementation details of TopologyBuilder. I would prefer still keep > > a graph-based implementation of TopologyBuilder internally, instead of a > > stack-based implementation, due to the flexible representation the > > graph-based implementation is able to. At the API, we should first focus on > > DAG-like operators. However, I would prefer to keep the implementation > > flexible to avoid having to re-write the TopologyBuilder class later, when > > we need to support non-DAG-like operators. p.s. It would be good if you can > > modify the example tasks using the fluent-style APIs to illustrate how the > > user experience is. And w/ the help from OperatorBuilder, the > > TopologyBuilder implementaion can achieve this: if user does not specify > > the input/output streams/tables (like in DAG-like operators), > > TopologyBuilder should be able to figure out and generate the intermediate > > streams/tables names and connect the operators via those intermediate > > streams/tables. This is a step we must do anyways for DAG-like operators. If the user specifies the input/output streams in the OperatorBuilder, the named streams/tables are created as vertices in the graph and operators are now connected to those vertices if they consume from those streams/tables. This is a simple extension from the DAG model that does not need structure-change in the TopologyBuilder. > > > > Just my two cents. Thanks!
Hi Yi, I was trying to simplify the TopologyBuilder by using stack based approach because it can handle the logical algebra generated by Calcite. But I understand your concerns of having multiple operators reading from a single intermediate stream, etc. I'll do a another revision of the code with your suggestions. > On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, > > line 161 > > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line161> > > > > My original intention to introduce the anonymous stream here is to > > represent the intermediate streams/tables. If we explicitly introduced the > > intermediate streams and tables in the following methods, I think that we > > can drop the anonymous ones. I was not clear about anonymous stream, thats why I added intermediate stream method. > On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, > > line 174 > > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line174> > > > > Could you elaborate more on what to be fixed here? I was not sure whether using a UUID as intermediate stream name is the right approach. I prefer to have more meaningful names in case we decided to distribute operators accross multiple jobs connected through Kafka topics. > On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote: > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java, > > line 16 > > <https://reviews.apache.org/r/37506/diff/2/?file=1041483#file1041483line16> > > > > Question: I am not quite sure about why we need this. Is it simply a > > projection operator that directly send output to the system streams? I was thinking in logical algebra terms. When there is a explicit insert in the query, calcite generates a LogicalInsert. I was trying to mirror that in our operator layer. But I started to working on re-writing planner I wrote previously. I read the Drill code and find out a better way to write the planner. I think I can get rid of need for explicit stream insert operator. But AFAIK, we will still need a operator to modify tables. - Milinda ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/37506/#review96136 ----------------------------------------------------------- On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/37506/ > ----------------------------------------------------------- > > (Updated Aug. 16, 2015, 3:57 p.m.) > > > Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh. > > > Bugs: SAMZA-552 > https://issues.apache.org/jira/browse/SAMZA-552 > > > Repository: samza > > > Description > ------- > > New proposal for TopologuBuilder API proposed in rb34500 > (https://reviews.apache.org/r/34500/). > > * Created a new class called TopologyBuilderV2 instead of changing existing > TopologyBuilder > * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two > tests which demonstrate the basic usage of the new API > * Window and aggregate related draft APIs are not done yet > * This is a WIP, please feel free to comment on the APIs > * This contains Yi's changes from RB 34500 > > > Diffs > ----- > > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java > 80ba455 > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java > 1e8f192 > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java > 7b4d984 > > samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java > d6f6b57 > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java > fb2aa89 > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java > 0759638 > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java > c49a822 > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java > 72a59f2 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java > c3d2266 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > cbc84d0 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java > e66451f > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > 56753b6 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java > e570897 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java > 2854aeb > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java > cc0aca0 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > b93d789 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > c47eed9 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > d81cc93 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > eec32ea > > samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java > b29838a > > samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java > 20dc701 > samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java > 9124e3c > > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java > 96e96c3 > > Diff: https://reviews.apache.org/r/37506/diff/ > > > Testing > ------- > > ./gradlew :samza-sql-core:test passed > > > Thanks, > > Milinda Pathirage > >