----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33280/#review83250 -----------------------------------------------------------
Looks good overall. Thanks for adding those end-to-end integration tests! It is great! samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java <https://reviews.apache.org/r/33280/#comment134168> In the new operator API, I merged the following two major changes: - The RelationOperator and TupleOperator into one single Operator interface and that will simplify the code here quite a bit w/: SimpleOperator operator = operatorFactory.getOperator(spec); - OperatorRouter is now a sub-class of Operator as well with simplified interface: router.addOperator(operator); The idea is model SimpleOperator as the basic operators and OperatorRouter as a composite operator that include a set of connected SimpleOperators. The only thing that is seemly different from the OperatorRouter and SimpleOperator is that SimpleOperator has a mandatory OperatorSpec and OperatorRouter does not (I am still debating on whether they should all have the OperatorSpec). What do you think from the parser/planner implementation point of view? Do the above changes make your code simplier? And one more: do you see a strong need to allow nested OperatorRouter in the query planner use cases? I did not add the support for that trying to a) keep the code simple; b) I don't see a strong use case in a Samza task that has to use multiple levels of nested OperatorRouter. Your opinion would be appreciate here. Thanks! samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java <https://reviews.apache.org/r/33280/#comment134146> nit: Could you add a few lines of explanation on how those InputGetter and the correlates are used? It would be as good if just a link to Calcite javadoc is here. samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java <https://reviews.apache.org/r/33280/#comment134197> nit: So, the build in expressions include unary and binary versions? Could you give some examples here? samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java <https://reviews.apache.org/r/33280/#comment134212> doc: Could you give one example as how this rule is used? samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java <https://reviews.apache.org/r/33280/#comment134217> How about array and map? samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java <https://reviews.apache.org/r/33280/#comment134218> Do we really need Schema.Type.NULL? samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java <https://reviews.apache.org/r/33280/#comment134221> Wouldn't this be an infinite recursion? samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java <https://reviews.apache.org/r/33280/#comment134228> For Fixed and Enum, I don't see how we can convert back from RelDataType. May as well drop the support for these types. samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java <https://reviews.apache.org/r/33280/#comment134229> Same here. samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java <https://reviews.apache.org/r/33280/#comment134263> Don't have the enum support in generic Schema class either. samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java <https://reviews.apache.org/r/33280/#comment134269> I would prefer that the physical plan is generated in a config format and the StreamTask will be able to instantiate the OperatorRouter from a config. That way, the parser/planner can be only in the deployment console and does not need to be installed in each Samza container. But this could be later. samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java <https://reviews.apache.org/r/33280/#comment134270> Do we absolutely need it in samza-sql-core? samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java <https://reviews.apache.org/r/33280/#comment134273> We should be able to change getFields() to return List<Field>, since all current use cases of Schema.getFields() in samza-sql-core is to iterate through all fields. samza-sql-core/src/main/java/org/apache/samza/sql/data/DataUtils.java <https://reviews.apache.org/r/33280/#comment134275> nit: this method does not seem to be used. Can we remove it? samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java <https://reviews.apache.org/r/33280/#comment134276> This should be: Schema type = getSchema(field.schema()) samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java <https://reviews.apache.org/r/33280/#comment134279> What about the operators w/ multiple input schemas and multiple output schemas? samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java <https://reviews.apache.org/r/33280/#comment134280> nit: just a style issue: we don't use assert except in test code. samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java <https://reviews.apache.org/r/33280/#comment134282> Should assemble a new intermediate tuple and send it downstream via collector.send(...) samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java <https://reviews.apache.org/r/33280/#comment134284> Question: isn't this operator just a FilterableStreamScanOp? There is no projection expression in the spec. samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java <https://reviews.apache.org/r/33280/#comment134288> Question: do we need to instantiate this factory class in Samza container? Or just needed for the front end Calcite parser/planner? - Yi Pan (Data Infrastructure) On May 6, 2015, 2:53 p.m., Milinda Pathirage wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/33280/ > ----------------------------------------------------------- > > (Updated May 6, 2015, 2:53 p.m.) > > > Review request for samza, Guozhang Wang and Yi Pan (Data Infrastructure). > > > Bugs: SAMZA-561 > https://issues.apache.org/jira/browse/SAMZA-561 > > > Repository: samza > > > Description > ------- > > This patch contains initial query execution planner implementation based on > Apache Calcite. > > - Basic 'insert into' and 'where' clause support > - Doesn't support projections, widnowing and aggregates. They will be added > later. > > > Diffs > ----- > > build.gradle a042567 > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/Utils.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/ExecutionPlanner.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/QueryPlanner.java > e1c22e9 > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/RexToJavaCompiler.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/FilterableStreamScanRule.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/ProjectableStreamScanRule.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/planner/rules/RemoveIdentityProjectRule.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/ProjectableFilterableStreamScan.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/rel/StreamScan.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaConverter.java > 705c0ff > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/AvroSchemaUtils.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/RelDataTypeUtils.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/SamzaStreamType.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/schema/Stream.java > PRE-CREATION > > samza-sql-calcite/src/main/java/org/apache/samza/sql/calcite/task/StreamSqlTask.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/SamzaStreamTableFactory.java > fd87aa5 > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestExecutionPlanner.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestQueryPlanner.java > 0bb15b2 > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/planner/TestRexToJavaCompiler.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java > fbb5c59 > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaUtils.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Constants.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/OrderStreamTableFactory.java > PRE-CREATION > > samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/test/Utils.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/Utils.java PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Field.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java > 1e8f192 > > samza-sql-core/src/main/java/org/apache/samza/sql/api/expressions/Expression.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java > 96385e2 > samza-sql-core/src/main/java/org/apache/samza/sql/data/DataUtils.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/data/IntermediateMessageTuple.java > PRE-CREATION > samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java > 577cf74 > > samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java > aad18f4 > > samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java > 348fc0c > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > 916b166 > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java > 93d4ebb > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TypeAwareOperatorSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/insert/InsertToStreamSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/project/ProjectSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanOp.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/ProjectableFilterableStreamScanSpec.java > PRE-CREATION > > samza-sql-core/src/main/java/org/apache/samza/sql/operators/scan/StreamScanSpec.java > PRE-CREATION > > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java > 7412669 > > samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/TestSqlAvroSerde.java > PRE-CREATION > samza-sql-core/src/test/resources/orders.avsc PRE-CREATION > samza-test/src/main/config/sql-filter.properties PRE-CREATION > > samza-test/src/main/java/org/apache/samza/test/integration/sql/OrdersStreamFactory.java > PRE-CREATION > samza-test/src/main/python/integration_tests.py df64e23 > samza-test/src/main/python/requirements.txt 2ae9590 > samza-test/src/main/python/tests/sql_tests.py PRE-CREATION > samza-test/src/main/resources/orders.avsc PRE-CREATION > samza-test/src/main/resources/orders.json PRE-CREATION > > Diff: https://reviews.apache.org/r/33280/diff/ > > > Testing > ------- > > * ./bin/check-all.sh passed. > * Integration tests passed including new streaming sql integration test. > > > Thanks, > > Milinda Pathirage > >