danny0405 commented on a change in pull request #12386: URL: https://github.com/apache/flink/pull/12386#discussion_r433589932
########## File path: docs/dev/table/connectors/index.md ########## @@ -0,0 +1,268 @@ +--- +title: "Table & SQL Connectors" +nav-id: sql-connectors +nav-parent_id: connectors-root +nav-pos: 2 +nav-show_overview: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + +<span class="label label-info">NOTE</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html). + +<span class="label label-danger">Attention</span> Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the [legacy documentation]({{ site.baseurl }}/dev/table/connect.html). + +* This will be replaced by the TOC +{:toc} + +Supported Connectors +------------ + +Flink natively support various connectors. The following tables list all available connectors. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Name</th> + <th class="text-center">Version</th> + <th class="text-center">Source</th> + <th class="text-center">Sink</th> + </tr> + </thead> + <tbody> + <tr> + <td>Filesystem</td> + <td></td> + <td>Bounded and Unbounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Elasticsearch</td> + <td>6.x & 7.x</td> + <td>Not supported</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Apache Kafka</td> + <td>0.10+</td> + <td>Unbounded Scan</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>JDBC</td> + <td></td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td> + <td>1.4.x</td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + </tbody> +</table> + +{% top %} + +How to use connectors +-------- + +Flink supports to use SQL CREATE TABLE statement to register a table. One can define the name of the table, the schema of the table, the connector options for connecting to an external system. + +The following code shows a full example of how to connect to Kafka for reading Json records. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + -- declare the schema of the table + `user` BIGINT, + message STRING, + ts TIMESTAMP, + proctime AS PROCTIME(), -- use computed column to define proctime attribute + WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- use WATERMARK statement to define rowtime attribute +) WITH ( + -- declare the external system to connect to + 'connector' = 'kafka', + 'topic' = 'topic_name', + 'scan.startup.mode' = 'earliest-offset', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'json' -- declare a format for this system +) +{% endhighlight %} +</div> +</div> + +In this ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory. + +If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties. + +{% top %} + +Schema Mapping +------------ + +The body clause of a SQL `CREATE TABLE` statement defines the names and types of columns, and constraints, watermarks. Flink doesn't hold the data, thus the schema definition only declares how to map types from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field names can be arbitrary). This will be explanation in every connectors. + +The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyTable ( + MyField1 INT, + MyField2 STRING, + MyField3 BOOLEAN +) WITH ( + ... +) +{% endhighlight %} +</div> +</div> + +### Primary Key + +Primary key constraints tell that a column or a set of columns of a table are unique and they do not contain null. Primary key therefore uniquely identify a row in a table. + Review comment: `contain null` -> `contain nulls`. Remove the therefore : `Primary key uniquely identifies` ########## File path: docs/dev/table/connectors/index.md ########## @@ -0,0 +1,268 @@ +--- +title: "Table & SQL Connectors" +nav-id: sql-connectors +nav-parent_id: connectors-root +nav-pos: 2 +nav-show_overview: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + +<span class="label label-info">NOTE</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html). + +<span class="label label-danger">Attention</span> Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the [legacy documentation]({{ site.baseurl }}/dev/table/connect.html). + +* This will be replaced by the TOC +{:toc} + +Supported Connectors +------------ + +Flink natively support various connectors. The following tables list all available connectors. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Name</th> + <th class="text-center">Version</th> + <th class="text-center">Source</th> + <th class="text-center">Sink</th> + </tr> + </thead> + <tbody> + <tr> + <td>Filesystem</td> + <td></td> + <td>Bounded and Unbounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Elasticsearch</td> + <td>6.x & 7.x</td> + <td>Not supported</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Apache Kafka</td> + <td>0.10+</td> + <td>Unbounded Scan</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>JDBC</td> + <td></td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td> + <td>1.4.x</td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + </tbody> +</table> + +{% top %} + +How to use connectors +-------- + +Flink supports to use SQL CREATE TABLE statement to register a table. One can define the name of the table, the schema of the table, the connector options for connecting to an external system. + +The following code shows a full example of how to connect to Kafka for reading Json records. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + -- declare the schema of the table + `user` BIGINT, + message STRING, + ts TIMESTAMP, + proctime AS PROCTIME(), -- use computed column to define proctime attribute + WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- use WATERMARK statement to define rowtime attribute +) WITH ( + -- declare the external system to connect to + 'connector' = 'kafka', + 'topic' = 'topic_name', + 'scan.startup.mode' = 'earliest-offset', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'json' -- declare a format for this system +) +{% endhighlight %} +</div> +</div> + +In this ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory. + +If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties. + +{% top %} + +Schema Mapping +------------ + +The body clause of a SQL `CREATE TABLE` statement defines the names and types of columns, and constraints, watermarks. Flink doesn't hold the data, thus the schema definition only declares how to map types from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field names can be arbitrary). This will be explanation in every connectors. + Review comment: `and constraints, watermarks` -> `constraints and watermarks` `The mapping may not be mapped by names` -> `The column names defined in the schema may or may not be the real physical table names` `be explanation` -> `be explained` ########## File path: docs/dev/table/connectors/index.md ########## @@ -0,0 +1,268 @@ +--- +title: "Table & SQL Connectors" +nav-id: sql-connectors +nav-parent_id: connectors-root +nav-pos: 2 +nav-show_overview: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + +<span class="label label-info">NOTE</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html). + +<span class="label label-danger">Attention</span> Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the [legacy documentation]({{ site.baseurl }}/dev/table/connect.html). + +* This will be replaced by the TOC +{:toc} + +Supported Connectors +------------ + +Flink natively support various connectors. The following tables list all available connectors. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Name</th> + <th class="text-center">Version</th> + <th class="text-center">Source</th> + <th class="text-center">Sink</th> + </tr> + </thead> + <tbody> + <tr> + <td>Filesystem</td> + <td></td> + <td>Bounded and Unbounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Elasticsearch</td> + <td>6.x & 7.x</td> + <td>Not supported</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Apache Kafka</td> + <td>0.10+</td> + <td>Unbounded Scan</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>JDBC</td> + <td></td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td> + <td>1.4.x</td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + </tbody> +</table> + +{% top %} + +How to use connectors +-------- + +Flink supports to use SQL CREATE TABLE statement to register a table. One can define the name of the table, the schema of the table, the connector options for connecting to an external system. + +The following code shows a full example of how to connect to Kafka for reading Json records. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + -- declare the schema of the table + `user` BIGINT, + message STRING, + ts TIMESTAMP, + proctime AS PROCTIME(), -- use computed column to define proctime attribute + WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- use WATERMARK statement to define rowtime attribute +) WITH ( + -- declare the external system to connect to + 'connector' = 'kafka', + 'topic' = 'topic_name', + 'scan.startup.mode' = 'earliest-offset', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'json' -- declare a format for this system +) +{% endhighlight %} +</div> +</div> + +In this ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory. + +If no factory can be found or multiple factories match for the given properties, an exception will be thrown with additional information about considered factories and supported properties. + +{% top %} + +Schema Mapping +------------ + +The body clause of a SQL `CREATE TABLE` statement defines the names and types of columns, and constraints, watermarks. Flink doesn't hold the data, thus the schema definition only declares how to map types from an external system to Flink’s representation. The mapping may not be mapped by names, it depends on the implementation of formats and connectors. For example, a MySQL database table is mapped by field names (not case sensitive), and a CSV filesystem is mapped by field order (field names can be arbitrary). This will be explanation in every connectors. + +The following example shows a simple schema without time attributes and one-to-one field mapping of input/output to table columns. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyTable ( + MyField1 INT, + MyField2 STRING, + MyField3 BOOLEAN +) WITH ( + ... +) +{% endhighlight %} +</div> +</div> + +### Primary Key + +Primary key constraints tell that a column or a set of columns of a table are unique and they do not contain null. Primary key therefore uniquely identify a row in a table. + +The primary key of a source table is a metadata information for optimization. The primary key of a sink table is usually used by the sink implementation for upserting. + +SQL standard specifies that a constraint can either be ENFORCED or NOT ENFORCED. This controls if the constraint checks are performed on the incoming/outgoing data. Flink does not own the data therefore the only mode we want to support is the NOT ENFORCED mode. Its up to the user to ensure that the query enforces key integrity. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyTable ( + MyField1 INT, + MyField2 STRING, + MyField3 BOOLEAN, + PRIMARY KEY (MyField1, MyField2) NOT ENFORCED -- defines a primary key on columns +) WITH ( + ... +) +{% endhighlight %} +</div> +</div> + +### Time Attributes + +Time attributes are essential when working with unbounded streaming tables. Therefore both processing-time and event-time (also known as "rowtime") attributes can be defined as part of the schema. + +For more information about time handling in Flink and especially event-time, we recommend the general [event-time section](streaming/time_attributes.html). + +#### Proctime Attributes + +In order to declare a proctime attribute in the schema, you can use Computed Column syntax to declare a computed column which is generated from `PROCTIME()` builtin function. +The computed column is a virtual column which is not stored in the physical data. Review comment: We should unify the `processing-time` and the `proctime` terms. ########## File path: docs/dev/table/connectors/index.md ########## @@ -0,0 +1,268 @@ +--- +title: "Table & SQL Connectors" +nav-id: sql-connectors +nav-parent_id: connectors-root +nav-pos: 2 +nav-show_overview: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + Review comment: `and/or` -> `and`, `and register them in Flink` -> `in Flink`. ########## File path: docs/dev/table/connectors/index.md ########## @@ -0,0 +1,268 @@ +--- +title: "Table & SQL Connectors" +nav-id: sql-connectors +nav-parent_id: connectors-root +nav-pos: 2 +nav-show_overview: true +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Avro, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + +<span class="label label-info">NOTE</span> If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html). + +<span class="label label-danger">Attention</span> Flink Table & SQL introduces a new set of connector options since 1.11.0, if you are using the legacy connector options, please refer to the [legacy documentation]({{ site.baseurl }}/dev/table/connect.html). + +* This will be replaced by the TOC +{:toc} + +Supported Connectors +------------ + +Flink natively support various connectors. The following tables list all available connectors. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">Name</th> + <th class="text-center">Version</th> + <th class="text-center">Source</th> + <th class="text-center">Sink</th> + </tr> + </thead> + <tbody> + <tr> + <td>Filesystem</td> + <td></td> + <td>Bounded and Unbounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Elasticsearch</td> + <td>6.x & 7.x</td> + <td>Not supported</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>Apache Kafka</td> + <td>0.10+</td> + <td>Unbounded Scan</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td>JDBC</td> + <td></td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + <tr> + <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache HBase</a></td> + <td>1.4.x</td> + <td>Bounded Scan, Lookup</td> + <td>Streaming Sink, Batch Sink</td> + </tr> + </tbody> +</table> + +{% top %} + +How to use connectors +-------- + +Flink supports to use SQL CREATE TABLE statement to register a table. One can define the name of the table, the schema of the table, the connector options for connecting to an external system. + +The following code shows a full example of how to connect to Kafka for reading Json records. + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE MyUserTable ( + -- declare the schema of the table + `user` BIGINT, + message STRING, + ts TIMESTAMP, + proctime AS PROCTIME(), -- use computed column to define proctime attribute + WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- use WATERMARK statement to define rowtime attribute +) WITH ( + -- declare the external system to connect to + 'connector' = 'kafka', + 'topic' = 'topic_name', + 'scan.startup.mode' = 'earliest-offset', + 'properties.bootstrap.servers' = 'localhost:9092', + 'format' = 'json' -- declare a format for this system +) +{% endhighlight %} +</div> +</div> + +In this ways the desired connection properties are converted into normalized, string-based key-value pairs. So-called [table factories](sourceSinks.html#define-a-tablefactory) create configured table sources, table sinks, and corresponding formats from the key-value pairs. All table factories that can be found via Java's [Service Provider Interfaces (SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken into account when searching for exactly-one matching table factory. + Review comment: What does the `normalized` mean ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org