danny0405 commented on a change in pull request #10669: [FLINK-15192][docs][table] Restructure "SQL" pages for better readability URL: https://github.com/apache/flink/pull/10669#discussion_r361578634
########## File path: docs/dev/table/sql/ddl.md ########## @@ -0,0 +1,297 @@ +--- +title: "Data Definition Language (DDL)" +nav-title: "Data Definition Language" +nav-parent_id: sql +nav-pos: 1 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +* This will be replaced by the TOC +{:toc} + +DDLs are specified with the `sqlUpdate()` method of the `TableEnvironment`. The method returns nothing for a success create/drop/alter database or table operation. A catalog table will be registered into the [Catalog]({{ site.baseurl }}/dev/table/catalogs.html) with a `CREATE TABLE` statement, then can be referenced in SQL queries. + +Flink SQL DDL statements are documented here, including: + +- CREATE TABLE, VIEW, DATABASE, FUNCTION +- DROP TABLE, VIEW, DATABASE, FUNCTION +- ALTER TABLE, DATABASE + +## Run a DDL + +The following examples show how to run a SQL DDL in `TableEnvironment`. + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +EnvironmentSettings settings = EnvironmentSettings.newInstance()... +TableEnvironment tableEnv = TableEnvironment.create(settings); + +// SQL query with a registered table +// register a table named "Orders" +tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)"); +// run a SQL query on the Table and retrieve the result as a new Table +Table result = tableEnv.sqlQuery( + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + +// SQL update with a registered table +// register a TableSink +tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)"); +// run a SQL update query on the Table and emit the result to the TableSink +tableEnv.sqlUpdate( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val settings = EnvironmentSettings.newInstance()... +val tableEnv = TableEnvironment.create(settings) + +// SQL query with a registered table +// register a table named "Orders" +tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)"); +// run a SQL query on the Table and retrieve the result as a new Table +val result = tableEnv.sqlQuery( + "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'"); + +// SQL update with a registered table +// register a TableSink +tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH ('connector.path'='/path/to/file' ...)"); +// run a SQL update query on the Table and emit the result to the TableSink +tableEnv.sqlUpdate( + "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") +{% endhighlight %} +</div> + +<div data-lang="python" markdown="1"> +{% highlight python %} +settings = EnvironmentSettings.newInstance()... +table_env = TableEnvironment.create(settings) + +# SQL update with a registered table +# register a TableSink +table_env.sql_update("CREATE TABLE RubberOrders(product VARCHAR, amount INT) with (...)") +# run a SQL update query on the Table and emit the result to the TableSink +table_env \ + .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'") +{% endhighlight %} +</div> +</div> + +{% top %} + +## Table DDL + +### CREATE TABLE + +{% highlight sql %} +CREATE TABLE [catalog_name.][db_name.]table_name + ( + { <column_definition> | <computed_column_definition> }[ , ...n] + [ <watermark_definition> ] + ) + [COMMENT table_comment] + [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + WITH (key1=val1, key2=val2, ...) + +<column_definition>: + column_name column_type [COMMENT column_comment] + +<computed_column_definition>: + column_name AS computed_column_expression [COMMENT column_comment] + +<watermark_definition>: + WATERMARK FOR rowtime_column_name AS watermark_strategy_expression + +{% endhighlight %} + +Creates a table with the given name. If a table with the same name already exists in the catalog, an exception is thrown. + +**COMPUTED COLUMN** + +Column declared with syntax "`column_name AS computed_column_expression`" is a computed column. A computed column is a virtual column that is not physically stored in the table. The column is computed from an non-query expression that uses other columns in the same table. For example, a computed column can have the definition: `cost AS price * qty`. The expression can be a noncomputed column name, constant, (user-defined/system) function, variable, and any combination of these connected by one or more operators. The expression cannot be a subquery. + +Computed column is introduced to Flink for defining [time attributes]({{ site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE DDL. +A [processing time attribute]({{ site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function. +On the other hand, computed column can be used to derive event time column because an event time column may need to be derived from existing fields, e.g. the original field is not `TIMESTAMP(3)` type or is nested in a JSON string. + +Notes: + +- A computed column defined on a source table is computed after reading from the source, it can be used in the following SELECT query statements. +- A computed column cannot be the target of an INSERT statement, INSERT statement should match SELECT clause's schema with sink table's schema without computed column. + + +**WATERMARK** + +The WATERMARK definition is used to define [event time attribute]({{ site.baseurl }}/dev/table/streaming/time_attributes.html#event-time) in CREATE TABLE DDL. + +The “`FOR rowtime_column_name`” statement defines which existing column is marked as event time attribute, the column must be `TIMESTAMP(3)` type and top-level in the schema and can be a computed column. + +The “`AS watermark_strategy_expression`” statement defines watermark generation strategy. It allows arbitrary non-query expression (can reference computed columns) to calculate watermark. The expression return type must be `TIMESTAMP(3)` which represents the timestamp since Epoch. + +The returned watermark will be emitted only if it is non-null and its value is larger than the previously emitted local watermark (to preserve the contract of ascending watermarks). The watermark generation expression is called by the framework for every record. +The framework will periodically emit the largest generated watermark. If the current watermark is still identical to the previous one, or is null, or the value of the returned watermark is smaller than that of the last emitted one, then no new watermark will be emitted. Review comment: `is called` -> `is evaluated` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services