sjwiesman commented on a change in pull request #10669: 
[FLINK-15192][docs][table] Restructure "SQL" pages for better readability
URL: https://github.com/apache/flink/pull/10669#discussion_r361867159
 
 

 ##########
 File path: docs/dev/table/sql/create.zh.md
 ##########
 @@ -0,0 +1,240 @@
+---
+title: "CREATE Statements"
+nav-parent_id: sql
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+CREATE statements are used to register a table/view/function into current or 
specified [Catalog]({{ site.baseurl }}/dev/table/catalogs.html). A registered 
table/view/function can be used in SQL queries.
+
+Flink SQL supports the following CREATE statements for now:
+
+- CREATE TABLE
+- CREATE VIEW
+- CREATE FUNCTION
+- CREATE DATABASE
+
+## Run a CREATE statement
+
+CREATE statements can be executed with the `sqlUpdate()` method of the 
`TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl 
}}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a 
successful CREATE operation, otherwise will throw an exception.
+
+The following examples show how to run a CREATE statement in 
`TableEnvironment`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+EnvironmentSettings settings = EnvironmentSettings.newInstance()...
+TableEnvironment tableEnv = TableEnvironment.create(settings);
+
+// SQL query with a registered table
+// register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount 
INT) WITH (...)");
+// run a SQL query on the Table and retrieve the result as a new Table
+Table result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register a TableSink
+tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH 
(...)");
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
LIKE '%Rubber%'");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val settings = EnvironmentSettings.newInstance()...
+val tableEnv = TableEnvironment.create(settings)
+
+// SQL query with a registered table
+// register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount 
INT) WITH (...)");
+// run a SQL query on the Table and retrieve the result as a new Table
+val result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+// SQL update with a registered table
+// register a TableSink
+tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH 
('connector.path'='/path/to/file' ...)");
+// run a SQL update query on the Table and emit the result to the TableSink
+tableEnv.sqlUpdate(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product 
LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+
+<div data-lang="python" markdown="1">
+{% highlight python %}
+settings = EnvironmentSettings.newInstance()...
+table_env = TableEnvironment.create(settings)
+
+# SQL query with a registered table
+# register a table named "Orders"
+tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount 
INT) WITH (...)");
+# run a SQL query on the Table and retrieve the result as a new Table
+result = tableEnv.sqlQuery(
+  "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+
+# SQL update with a registered table
+# register a TableSink
+table_env.sql_update("CREATE TABLE RubberOrders(product STRING, amount INT) 
WITH (...)")
+# run a SQL update query on the Table and emit the result to the TableSink
+table_env \
+    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders 
WHERE product LIKE '%Rubber%'")
+{% endhighlight %}
+</div>
+</div>
+
+The following examples show how to run a CREATE statement in SQL CLI.
+
+{% highlight sql %}
+Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) 
WITH (...);
+[INFO] Table has been created.
+
+Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
+[INFO] Table has been created.
+
+Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE 
product LIKE '%Rubber%';
+[INFO] Submitting SQL update statement to the cluster...
+{% endhighlight %}
+
+
+{% top %}
+
+## CREATE TABLE
+
+{% highlight sql %}
+CREATE TABLE [catalog_name.][db_name.]table_name
+  (
+    { <column_definition> | <computed_column_definition> }[ , ...n]
+    [ <watermark_definition> ]
+  )
+  [COMMENT table_comment]
+  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
+  WITH (key1=val1, key2=val2, ...)
+
+<column_definition>:
+  column_name column_type [COMMENT column_comment]
+
+<computed_column_definition>:
+  column_name AS computed_column_expression [COMMENT column_comment]
+
+<watermark_definition>:
+  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
+
+{% endhighlight %}
+
+Creates a table with the given name. If a table with the same name already 
exists in the catalog, an exception is thrown.
+
+**COMPUTED COLUMN**
+
+Column declared with syntax "`column_name AS computed_column_expression`" is a 
computed column. A computed column is a virtual column that is not physically 
stored in the table. The column is computed from an non-query expression that 
uses other columns in the same table. For example, a computed column can have 
the definition: `cost AS price * qty`. The expression can be a noncomputed 
column name, constant, (user-defined/system) function, variable, and any 
combination of these connected by one or more operators. The expression cannot 
be a subquery.
+
+Computed column is introduced to Flink for defining [time attributes]({{ 
site.baseurl}}/dev/table/streaming/time_attributes.html) in CREATE TABLE 
statement.
+A [processing time attribute]({{ 
site.baseurl}}/dev/table/streaming/time_attributes.html#processing-time) can be 
defined easily via `proc AS PROCTIME()` using the system `PROCTIME()` function.
+On the other hand, computed column can be used to derive event time column 
because an event time column may need to be derived from existing fields, e.g. 
the original field is not `TIMESTAMP(3)` type or is nested in a JSON string.
+
+Notes:
+
+- A computed column defined on a source table is computed after reading from 
the source, it can be used in the following SELECT query statements.
+- A computed column cannot be the target of an INSERT statement. In INSERT 
statement, the schema of SELECT clause should match the schema of target table 
without computed columns.
+
+
+**WATERMARK**
+
+The WATERMARK definition is used to define [event time attribute]({{ 
site.baseurl }}/dev/table/streaming/time_attributes.html#event-time) in CREATE 
TABLE statement.
+
+The “`FOR rowtime_column_name`” statement defines which existing column is 
marked as event time attribute, the column must be `TIMESTAMP(3)` type and 
top-level in the schema and can be a computed column.
+
+The “`AS watermark_strategy_expression`” statement defines watermark 
generation strategy. It allows arbitrary non-query expression (can reference 
computed columns) to calculate watermark. The expression return type must be 
`TIMESTAMP(3)` which represents the timestamp since Epoch.
+
+The returned watermark will be emitted only if it is non-null and its value is 
larger than the previously emitted local watermark (to preserve the contract of 
ascending watermarks). The watermark generation expression is evaluated by the 
framework for every record.
+The framework will periodically emit the largest generated watermark. If the 
current watermark is still identical to the previous one, or is null, or the 
value of the returned watermark is smaller than that of the last emitted one, 
then no new watermark will be emitted.
+Watermark is emitted in an interval defined by 
[`pipeline.auto-watermark-interval`]({{ site.baseurl 
}}/ops/config.html#pipeline-auto-watermark-interval) configuration.
+If watermark interval is `0ms`, the generated watermarks will be emitted 
per-record if it is not null and greater than the last emitted one.
+
+For common cases, Flink provide some suggested and easy-to-use ways to define 
commonly used watermark strategies. Such as:
+
+- Strictly ascending timestamps: `WATERMARK FOR rowtime_column AS 
rowtime_column`
+
+  Emits a watermark of the maximum observed timestamp so far. Rows that have a 
timestamp smaller to the max timestamp are not late.
+
+- Ascending timestamps: `WATERMARK FOR rowtime_column AS rowtime_column - 
INTERVAL '0.001' SECOND`
+
+  Emits a watermark of the maximum observed timestamp so far minus 1 (the 
smallest unit of watermark is millisecond). Rows that have a timestamp equal to 
the max timestamp are not late.
+
+- Bounded out of orderness timestamps: `WATERMARK FOR rowtime_column AS 
rowtimeField - INTERVAL 'string' timeUnit`
+
+  Emits watermarks which are the maximum observed timestamp minus the 
specified delay, e.g. `WATERMARK FOR rowtime_column AS rowtimeField - INTERVAL 
'5' SECOND` is a 5 seconds delayed watermark strategy.
+
+- Preserves assigned watermarks from source (**Not supported yet**): 
`WATERMARK FOR rowtime_column AS SYSTEM_WATERMARK()`
+
+  Indicates the watermarks are preserved from the source (e.g. some sources 
can generate watermarks themselves).
+
+**PARTITIONED BY**
+
+Partition the created table by the specified columns. A directory is created 
for each partition if this table is used as a filesystem sink.
+
+**WITH OPTIONS**
+
+Table properties used to create a table source/sink. The properties are 
usually used to find and create the underlying connector.
+
+The key and value of expression `key1=val1` should both be string literal. See 
details in [Connect to External Systems]({{ site.baseurl 
}}/dev/table/connect.html) for all the supported table properties of different 
connectors.
+
+**Notes:** The table name can be of three formats: 1. 
`catalog_name.db_name.table_name` 2. `db_name.table_name` 3. `table_name`. For 
`catalog_name.db_name.table_name`, the table would be registered into metastore 
with catalog named "catalog_name" and database named "db_name"; for 
`db_name.table_name`, the table would be registered into the current catalog of 
the execution table environment and database named "db_name"; for `table_name`, 
the table would be registered into the current catalog and database of the 
execution table environment.
+
+**Notes:** The table registered with `CREATE TABLE` statement can be used as 
both table source and table sink, we can not decide if it is used as a source 
or sink until it is referenced in the DMLs.
+
+{% top %}
+
+
+## CREATE VIEW
+
+*TODO: should add descriptions.*
+
+## CREATE FUNCTION
+
+*TODO: should add descriptions.*
+
 
 Review comment:
   
   Please either fill these in or remove the sections entirely and re-add them 
in a follow up PR.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to