> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > Hi, Milinda, sorry for the late review. I have put down my comments below. 
> > Overall, there are two things to be discussed:
> > 1) Adding OperatorBuilder interface as well. It serves two purposes:
> >    a) I remember that we have discussed the need for this due to the fact 
> > that in the parsing/planning phase, there are cases where the required 
> > parameters for the operator are not generated / finalized yet (hence you 
> > have added some setter functions in OperatorSpec as workaround). W/ 
> > OperatorBuilder, it is much easier that we just keep setting the parameters 
> > w/o calling build()
> >    b) In the user code directly using operator layer API, using 
> > OperatorBuilder can help to make the TopologyBuilder code more intuitive 
> > and helps to hide away all unnecessary specs s.t. intermediate stream/table 
> > names and/or operator names
> > 2) The implementation details of TopologyBuilder. I would prefer still keep 
> > a graph-based implementation of TopologyBuilder internally, instead of a 
> > stack-based implementation, due to the flexible representation the 
> > graph-based implementation is able to. At the API, we should first focus on 
> > DAG-like operators. However, I would prefer to keep the implementation 
> > flexible to avoid having to re-write the TopologyBuilder class later, when 
> > we need to support non-DAG-like operators. p.s. It would be good if you can 
> > modify the example tasks using the fluent-style APIs to illustrate how the 
> > user experience is. And w/ the help from OperatorBuilder, the 
> > TopologyBuilder implementaion can achieve this: if user does not specify 
> > the input/output streams/tables (like in DAG-like operators), 
> > TopologyBuilder should be able to figure out and generate the intermediate 
> > streams/tables names and connect the operators via those intermediate 
> > streams/tables. This is a step we must do anyways for DAG-like 
 operators. If the user specifies the input/output streams in the 
OperatorBuilder, the named streams/tables are created as vertices in the graph 
and operators are now connected to those vertices if they consume from those 
streams/tables. This is a simple extension from the DAG model that does not 
need structure-change in the TopologyBuilder.
> > 
> > Just my two cents. Thanks!

Hi Yi,

I was trying to simplify the TopologyBuilder by using stack based approach 
because it can handle the logical algebra generated by Calcite. But I 
understand your concerns of having multiple operators reading from a single 
intermediate stream, etc. I'll do a another revision of the code with your 
suggestions.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, 
> > line 161
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line161>
> >
> >     My original intention to introduce the anonymous stream here is to 
> > represent the intermediate streams/tables. If we explicitly introduced the 
> > intermediate streams and tables in the following methods, I think that we 
> > can drop the anonymous ones.

I was not clear about anonymous stream, thats why I added intermediate stream 
method.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java, 
> > line 174
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041451#file1041451line174>
> >
> >     Could you elaborate more on what to be fixed here?

I was not sure whether using a UUID as intermediate stream name is the right 
approach. I prefer to have more meaningful names in case we decided to 
distribute operators accross multiple jobs connected through Kafka topics.


> On Aug. 24, 2015, 8:35 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java,
> >  line 16
> > <https://reviews.apache.org/r/37506/diff/2/?file=1041483#file1041483line16>
> >
> >     Question: I am not quite sure about why we need this. Is it simply a 
> > projection operator that directly send output to the system streams?

I was thinking in logical algebra terms. When there is a explicit insert in the 
query, calcite generates a LogicalInsert. I was trying to mirror that in our 
operator layer. But I started to working on re-writing planner I wrote 
previously. I read the Drill code and find out a better way to write the 
planner. I think I can get rid of need for explicit stream insert operator. But 
AFAIK, we will still need a operator to modify tables.


- Milinda


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/37506/#review96136
-----------------------------------------------------------


On Aug. 16, 2015, 3:57 p.m., Milinda Pathirage wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/37506/
> -----------------------------------------------------------
> 
> (Updated Aug. 16, 2015, 3:57 p.m.)
> 
> 
> Review request for samza, Yi Pan (Data Infrastructure) and Navina Ramesh.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> New proposal for TopologuBuilder API proposed in rb34500 
> (https://reviews.apache.org/r/34500/).
> 
> * Created a new class called TopologyBuilderV2 instead of changing existing 
> TopologyBuilder
> * org.apache.samza.sql.operators.factory.TestTopologyBuilderV2 contains two 
> tests which demonstrate the basic usage of the new API
> * Window and aggregate related draft APIs are not done yet
> * This is a WIP, please feel free to comment on the APIs
> * This contains Yi's changes from RB 34500
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> 80ba455 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> 1e8f192 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> 7b4d984 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/ScalarExpression.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/TupleExpression.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> d6f6b57 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  fb2aa89 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  0759638 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  c49a822 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  72a59f2 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/SimpleRouter.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  c3d2266 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  cbc84d0 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  e66451f 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  56753b6 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  e570897 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderException.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilderV2.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/filter/FilterSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/JoinType.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamRelationJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  2854aeb 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  cc0aca0 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/InsertToStreamOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/Operation.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/StreamModifySpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/modify/TableModifySpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/FieldBasedPartitionKeyGenerator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  b93d789 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  c47eed9 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScan.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScan.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/TableScanSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  d81cc93 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  eec32ea 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  b29838a 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/operators/factory/TestTopologyBuilderV2.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  20dc701 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> 9124e3c 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  96e96c3 
> 
> Diff: https://reviews.apache.org/r/37506/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew :samza-sql-core:test passed
> 
> 
> Thanks,
> 
> Milinda Pathirage
> 
>

Reply via email to