sjwiesman commented on a change in pull request #10669: [FLINK-15192][docs][table] Restructure "SQL" pages for better readability URL: https://github.com/apache/flink/pull/10669#discussion_r361867102
########## File path: docs/dev/table/sql/create.md ########## @@ -0,0 +1,240 @@ +--- +title: "CREATE Statements" +nav-parent_id: sql +nav-pos: 2 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +CREATE statements are used to register a table/view/function into current or specified [Catalog]({{ site.baseurl }}/dev/table/catalogs.html). A registered table/view/function can be used in SQL queries. + +Flink SQL supports the following CREATE statements for now: + +- CREATE TABLE +- CREATE VIEW +- CREATE FUNCTION +- CREATE DATABASE + +## Run a CREATE statement + +CREATE statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful CREATE operation, otherwise will throw an exception. + +The following examples show how to run a CREATE statement in `TableEnvironment`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +EnvironmentSettings settings = EnvironmentSettings.newInstance()... +TableEnvironment tableEnv = TableEnvironment.create(settings); + +// SQL query with a registered table +// register a table named "Orders" +tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)"); +// run a SQL query on the Table and retrieve the result as a new Table +Table result = tableEnv.sqlQuery( + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + +// SQL update with a registered table +// register a TableSink +tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)"); +// run a SQL update query on the Table and emit the result to the TableSink +tableEnv.sqlUpdate( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val settings = EnvironmentSettings.newInstance()... +val tableEnv = TableEnvironment.create(settings) + +// SQL query with a registered table +// register a table named "Orders" +tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)"); +// run a SQL query on the Table and retrieve the result as a new Table +val result = tableEnv.sqlQuery( + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + +// SQL update with a registered table +// register a TableSink +tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)"); +// run a SQL update query on the Table and emit the result to the TableSink +tableEnv.sqlUpdate( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") +{% endhighlight %} +</div> + +<div data-lang="python" markdown="1"> +{% highlight python %} +settings = EnvironmentSettings.newInstance()... +table_env = TableEnvironment.create(settings) + +# SQL query with a registered table +# register a table named "Orders" +tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)"); +# run a SQL query on the Table and retrieve the result as a new Table +result = tableEnv.sqlQuery( + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + +# SQL update with a registered table +# register a TableSink +table_env.sql_update("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)") +# run a SQL update query on the Table and emit the result to the TableSink +table_env \ + .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") +{% endhighlight %} +</div> +</div> + +The following examples show how to run a CREATE statement in SQL CLI. + +{% highlight sql %} +Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...); +[INFO] Table has been created. + +Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...); +[INFO] Table has been created. + +Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'; +[INFO] Submitting SQL update statement to the cluster... +{% endhighlight %} + + +{% top %} + +## CREATE TABLE + +{% highlight sql %} +CREATE TABLE [catalog_name.][db_name.]table_name + ( + { <column_definition> | <computed_column_definition> }[ , ...n] + [ <watermark_definition> ] + ) + [COMMENT table_comment] + [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + WITH (key1=val1, key2=val2, ...) + +<column_definition>: + column_name column_type [COMMENT column_comment] + +<computed_column_definition>: + column_name AS computed_column_expression [COMMENT column_comment] + +<watermark_definition>: + WATERMARK FOR rowtime_column_name AS watermark_strategy_expression + +{% endhighlight %} + +Creates a table with the given name. If a table with the same name already exists in the catalog, an exception is thrown. + +**COMPUTED COLUMN** + +Column declared with syntax "`column_name AS computed_column_expression`" is a computed column. A computed column is a virtual column that is not physically stored in the table. The column is computed from an non-query expression that uses other columns in the same table. For example, a computed column can have the definition: `cost AS price * qty`. The expression can be a noncomputed column name, constant, (user-defined/system) function, variable, and any combination of these connected by one or more operators. The expression cannot be a subquery. + +Computed column is introduced to Flink for defining [time attributes]({{ site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE statement. +A [processing time attribute]({{ site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function. +On the other hand, computed column can be used to derive event time column because an event time column may need to be derived from existing fields, e.g. the original field is not `TIMESTAMP(3)` type or is nested in a JSON string. + +Notes: + +- A computed column defined on a source table is computed after reading from the source, it can be used in the following SELECT query statements. +- A computed column cannot be the target of an INSERT statement. In INSERT statement, the schema of SELECT clause should match the schema of target table without computed columns. + + +**WATERMARK** Review comment: The `WATERMARK` defines the event time attributes of a table and takes the form `WATERMARK FOR rowtime_column_name AS watermark_strategy_expression`. The `rowtime_column_name` defines an existing column that is marked as the event time attribute of the table. The column must be of type `TIMESTAMP(3)` and be a top-level column in the schema. It may be a computed column. The `watermark_strategy_expression` defines the watermark generation strategy. It allows arbitrary non-query expression, including computed columns, to calculate the watermark. The expression return type must be TIMESTAMP(3), which represents the timestamp since the Epoch. When using event time semantics, tables must contain an event time attribute and watermarking strategy. Flink provides several commonly used watermark strategies. - Strictly ascending timestamps: `WATERMARK FOR rowtime_column AS rowtime_column` Emits a watermark of the maximum observed timestamp so far. Rows that have a timestamp smaller to the max timestamp are not late. - Ascending timestamps: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp are not late. - Bounded out of orderness timestamps: WATERMARK FOR rowtime_column AS rowtimeField - INTERVAL 'string' timeUnit Emits watermarks, which are the maximum observed timestamp minus the specified delay, e.g., `WATERMARK FOR rowtime_column AS rowtimeField - INTERVAL '5' SECOND` is a 5 seconds delayed watermark strategy. {% highlight sql %} CREATE TABLE Orders ( user BIGINT, product STRING, order_time TIMESTAMP(3), WATERMARK FOR order_time AS order_time - '5' SECONDS ) WITH ( . . . ); {% endhighlight %} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services