wuchong commented on a change in pull request #12577: URL: https://github.com/apache/flink/pull/12577#discussion_r439899203
########## File path: docs/dev/table/sql/queries.md ########## @@ -134,16 +131,88 @@ t_env.connect(FileSystem().path("/path/to/file"))) .field("amount", DataTypes.BIGINT())) .create_temporary_table("RubberOrders") -# run a SQL update query on the Table and emit the result to the TableSink +# run an INSERT SQL on the Table and emit the result to the TableSink table_env \ - .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") + .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") {% endhighlight %} </div> </div> {% top %} -## Supported Syntax +## Execute SELECT statement +A SELECT statement can be executed to collect the content to local through the `TableEnvironment.executeSql()` method. The method returns the result of the SELECT statement as a `TableResult`. Similar to SELECT statement, a `Table` can also be executed to collect to content of the table to local through the `Table.execute()` method. +`TableResult.collect()` method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the `CloseableIterator#close()` method. +We can print the select result to client console through the `TableResult.print()` method. But we should make sure all result data to print should be small, because all data will be collected to local memory first, and then print them to console. + +**Notes:** Only append-only streaming SELECT statement is supported now. Review comment: ```suggestion **Notes:** For streaming mode, only append-only query is supported now. ``` ########## File path: docs/dev/table/common.md ########## @@ -1039,7 +1032,9 @@ val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](ta </div> </div> -**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document. +**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document. + +<span class="label label-danger">Attention</span> **Once the Table is converted to a DataStream, we must use the StreamExecutionEnvironment.execute method to execute the DataStream program.** Review comment: ```suggestion <span class="label label-danger">Attention</span> **Once the Table is converted to a DataStream, please use the `StreamExecutionEnvironment.execute` method to execute the DataStream program.** ``` ########## File path: docs/dev/table/sql/explain.md ########## @@ -0,0 +1,190 @@ +--- +title: "EXPLAIN Statements" +nav-parent_id: sql +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +EXPLAIN statements are used to explain the logical and optimized query plans of a SELECT statement or DML statement. + + +## Run an EXPLAIN statement + +EXPLAIN statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns explain result for a successful EXPLAIN operation, otherwise will throw an exception. + +The following examples show how to run an EXPLAIN statement in `TableEnvironment` and in SQL CLI. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +// register a table named "Orders" +tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)"); +tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)"); + +// explain SELECT statement through TableEnvironment.explainSql() +String explanation = tEnv.explainSql( + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT count, word FROM MyTable2"); +System.out.println(explanation); + +// explain SELECT statement through TableEnvironment.executeSql() +TableResult tableResult = tEnv.executeSql( + "EXPLAIN PLAN FOR " + + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT count, word FROM MyTable2"); +tableResult.print(); + +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +val tEnv = StreamTableEnvironment.create(env) + +// register a table named "Orders" +tEnv.executeSql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)") +tEnv.executeSql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)") + +// explain SELECT statement through TableEnvironment.explainSql() +val explanation = tEnv.explainSql( + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT count, word FROM MyTable2") +println(explanation) + +// explain SELECT statement through TableEnvironment.executeSql() +val tableResult = tEnv.executeSql( + "EXPLAIN PLAN FOR " + + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + + "UNION ALL " + + "SELECT count, word FROM MyTable2") +tableResult.print() + +{% endhighlight %} +</div> + +<div data-lang="python" markdown="1"> +{% highlight python %} +settings = EnvironmentSettings.new_instance()... +table_env = StreamTableEnvironment.create(env, settings) + +t_env.execute_sql("CREATE TABLE MyTable1 (count bigint, work VARCHAR(256) WITH (...)") +t_env.execute_sql("CREATE TABLE MyTable2 (count bigint, work VARCHAR(256) WITH (...)") + +# explain SELECT statement through TableEnvironment.explain_sql() +explanation1 = t_env.explain_sql( + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + "UNION ALL " + "SELECT count, word FROM MyTable2") +print(explanation1) + +# explain SELECT statement through TableEnvironment.execute_sql() +table_result = t_env.execute_sql( + "EXPLAIN PLAN FOR " + "SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' " + "UNION ALL " + "SELECT count, word FROM MyTable2") +table_result.print() + +{% endhighlight %} +</div> + +<div data-lang="SQL CLI" markdown="1"> +{% highlight sql %} +Flink SQL> CREATE TABLE MyTable1 (count bigint, work VARCHAR(256); +[INFO] Table has been created. + +Flink SQL> CREATE TABLE MyTable2 (count bigint, work VARCHAR(256); +[INFO] Table has been created. + +Flink SQL> EXPLAIN PLAN FOR SELECT count, word FROM MyTable1 WHERE word LIKE 'F%' +> UNION ALL +> SELECT count, word FROM MyTable2; + +{% endhighlight %} +</div> +</div> + +The `EXPLAIN` result is: +<div> +{% highlight text %} +== Abstract Syntax Tree == +LogicalUnion(all=[true]) + LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) + FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) + FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) + + +== Optimized Logical Plan == +DataStreamUnion(all=[true], union all=[count, word]) + DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')]) + TableSourceScan(table=[[default_catalog, default_database, MyTable1]], fields=[count, word]) + TableSourceScan(table=[[default_catalog, default_database, MyTable2]], fields=[count, word]) + +== Physical Execution Plan == +Stage 1 : Data Source + content : collect elements with CollectionInputFormat + +Stage 2 : Data Source + content : collect elements with CollectionInputFormat + + Stage 3 : Operator + content : from: (count, word) + ship_strategy : REBALANCE + + Stage 4 : Operator + content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word) + ship_strategy : FORWARD + + Stage 5 : Operator + content : from: (count, word) + ship_strategy : REBALANCE +{% endhighlight %} +</div> + +Similar to `EXPLAIN PLAN FOR SELECT ...`, we can also use `EXPLAIN PLAN FOR INSERT ...` statement to get the plan for INSERT statement. + +{% top %} + +## Syntax + +{% highlight sql %} +EXPLAIN PLAN +[EXCLUDING ATTRIBUTES | INCLUDING ALL ATTRIBUTES] +[WITH TYPE | WITH IMPLEMENTATION | WITHOUT IMPLEMENTATION] +[AS XML | AS JSON] +FOR +<select_statement_or_insert_statement> + +{% endhighlight %} + +For select syntax, please refer to [SELECT]({{ site.baseurl }}/dev/table/sql/queries.html#supported-syntax) page. +For insert syntax, please refer to [INSERT]({{ site.baseurl }}/dev/table/sql/insert.html) page. + +Currently, only `EXPLAIN PLAN FOR <select_or_insert_statement>` is supported. Review comment: I would prefer not list the unsupported syntax (`EXCLUDING ATTRIBUTES`, etc...), because it doesn't help anything for users, but confuse users. Then we can remove the above line. ########## File path: docs/dev/table/sql/insert.md ########## @@ -55,27 +75,65 @@ val settings = EnvironmentSettings.newInstance()... val tEnv = TableEnvironment.create(settings) // register a source table named "Orders" and a sink table named "RubberOrders" -tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") -tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)") +tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)") +tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)") -// run a SQL update query on the registered source table and emit the result to registered sink table -tEnv.sqlUpdate( +// run a single INSERT query on the registered source table and emit the result to registered sink table +val tableResult1 = tEnv.executeSql( "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") +// get job status through TableResult +println(tableResult1.getJobClient().get().getJobStatus()) + +//---------------------------------------------------------------------------- +// register another sink table named "GlassOrders" for multiple INSERT queries +tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)") + +// run multiple INSERT queries on the registered source table and emit the result to registered sink tables +val stmtSet = tEnv.createStatementSet() +// only single INSERT query can be accepted by `addInsertSql` method +stmtSet.addInsertSql( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") +stmtSet.addInsertSql( + "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'") +// execute all statements as a batch Review comment: `// execute all statements together`? `batch` may confuse users. ########## File path: docs/try-flink/table_api.md ########## @@ -123,11 +123,14 @@ tEnv.registerTableSource("transactions", new BoundedTransactionTableSource()); tEnv.registerTableSink("spend_report", new SpendReportTableSink()); tEnv.registerFunction("truncateDateToHour", new TruncateDateToHour()); -tEnv - .scan("transactions") - .insertInto("spend_report"); - -env.execute("Spend Report"); +// build table program +Table table = tEnv.scan("transactions"); + +// trigger execution +TableResult tableResult = table.executeInsert("spend_report"); Review comment: ``` TableResult tableResult = tEnv .scan("transactions") .executeInsert("spend_report"); ``` Keep the same with archetype code? I also prefer this one because it's more concise for example. ########## File path: docs/dev/table/sql/use.md ########## @@ -0,0 +1,191 @@ +--- +title: "USE Statements" +nav-parent_id: sql +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +USE statements are used to set the default database or catalog. + + +## Run a USE statement + +USE statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful USE operation, otherwise will throw an exception. + +The following examples show how to run a USE statement in `TableEnvironment` and in SQL CLI. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + +// create a catalog +tEnv.executeSql("CREATE CATALOG cat1 WITH (...)"); +tEnv.executeSql("SHOW CATALOGS").print(); +// +-----------------+ +// | result | +// +-----------------+ +// | default_catalog | +// | cat1 | +// +-----------------+ + +// change default catalog +tEnv.executeSql("USE CATALOG cat1"); + +tEnv.executeSql("SHOW DATABASES").print(); +// databases are empty +// +--------+ +// | result | +// +--------+ +// +--------+ + +// create a database +tEnv.executeSql("CREATE DATABASE db1 WITH (...)"); +tEnv.executeSql("SHOW DATABASES").print(); +// +--------+ +// | result | +// +--------+ +// | db1 | +// +--------+ + +// change default database +tEnv.executeSql("USE db1"); + +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val env = StreamExecutionEnvironment.getExecutionEnvironment() +val tEnv = StreamTableEnvironment.create(env) + +// create a catalog +tEnv.executeSql("CREATE CATALOG cat1 WITH (...)") +tEnv.executeSql("SHOW CATALOGS").print() +// +-----------------+ +// | result | +// +-----------------+ +// | default_catalog | +// | cat1 | +// +-----------------+ + +// change default catalog +tEnv.executeSql("USE CATALOG cat1") + +tEnv.executeSql("SHOW DATABASES").print() +// databases are empty +// +--------+ +// | result | +// +--------+ +// +--------+ + +// create a database +tEnv.executeSql("CREATE DATABASE db1 WITH (...)") +tEnv.executeSql("SHOW DATABASES").print() +// +--------+ +// | result | +// +--------+ +// | db1 | +// +--------+ + +// change default database +tEnv.executeSql("USE db1") + +{% endhighlight %} +</div> + +<div data-lang="python" markdown="1"> +{% highlight python %} +settings = EnvironmentSettings.new_instance()... +table_env = StreamTableEnvironment.create(env, settings) + +# create a catalog +table_env.execute_sql("CREATE CATALOG cat1 WITH (...)") +table_env.execute_sql("SHOW CATALOGS").print() +# +-----------------+ +# | result | +# +-----------------+ +# | default_catalog | +# | cat1 | +# +-----------------+ + +# change default catalog +table_env.execute_sql("USE CATALOG cat1") + +table_env.execute_sql("SHOW DATABASES").print() +# databases are empty +# +--------+ +# | result | +# +--------+ +# +--------+ + +# create a database +table_env.execute_sql("CREATE DATABASE db1 WITH (...)") +table_env.execute_sql("SHOW DATABASES").print() +# +--------+ +# | result | +# +--------+ +# | db1 | +# +--------+ + +# change default database +table_env.execute_sql("USE db1") + +{% endhighlight %} +</div> + +<div data-lang="SQL CLI" markdown="1"> +{% highlight sql %} +Flink SQL> CREATE CATALOG cat1 WITH (...); +[INFO] Catalog has been created. + +Flink SQL> SHOW CATALOGS; +default_catalog +cat1 + +Flink SQL> USE CATALOG cat1; + +Flink SQL> SHOW DATABASES; + +Flink SQL> CREATE DATABASE db1 WITH (...); +[INFO] Database has been created. + +Flink SQL> SHOW DATABASES; +db1 + +Flink SQL> USE db1; + +{% endhighlight %} +</div> +</div> + +{% top %} + +## Syntax + +{% highlight sql %} +USE CATALOG catalog_name + +USE [catalog_name.]database_name +{% endhighlight %} Review comment: Split them into two section? `USE CATALOG`, `USE DATABASE`? And add a description under the syntax, e.g . ``` Set the current database. All subsequent commands that do not explicitly specify a database will use this one. If the provided database does not exist, an exception is thrown. The default current database is `default_database`. ``` ########## File path: docs/dev/table/sql/use.md ########## @@ -0,0 +1,191 @@ +--- +title: "USE Statements" +nav-parent_id: sql +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +USE statements are used to set the default database or catalog. Review comment: `default` -> `current`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org