danny0405 commented on a change in pull request #12386:
URL: https://github.com/apache/flink/pull/12386#discussion_r434281449



##########
File path: docs/dev/table/connectors/index.zh.md
##########
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to register table sources and table sinks in Flink 
using the natively supported connectors. After a source or sink has been 
registered, it can be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own 
*custom* table source or sink, have a look at the [user-defined sources & sinks 
page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces 
a new set of connector options since 1.11.0, if you are using the legacy 
connector options, please refer to the [legacy documentation]({{ site.baseurl 
}}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all 
available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache 
HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can 
define the name of the table, the schema of the table, the connector options 
for connecting to an external system.
+
+The following code shows a full example of how to connect to Kafka for reading 
Json records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts TIMESTAMP,
+  proctime AS PROCTIME(), -- use computed column to define proctime attribute
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to 
define rowtime attribute
+) WITH (
+  -- declare the external system to connect to
+  'connector' = 'kafka',
+  'topic' = 'topic_name',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'   -- declare a format for this system
+)
+{% endhighlight %}
+</div>
+</div>
+
+In this ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
+
+If no factory can be found or multiple factories match for the given 
properties, an exception will be thrown with additional information about 
considered factories and supported properties.
+
+{% top %}
+
+Schema Mapping
+------------
+
+The body clause of a SQL `CREATE TABLE` statement defines the names and types 
of columns, constraints and watermarks. Flink doesn't hold the data, thus the 
schema definition only declares how to map types from an external system to 
Flink’s representation. The mapping may not be mapped by names, it depends on 
the implementation of formats and connectors. For example, a MySQL database 
table is mapped by field names (not case sensitive), and a CSV filesystem is 
mapped by field order (field names can be arbitrary). This will be explained in 
every connectors.
+
+The following example shows a simple schema without time attributes and 
one-to-one field mapping of input/output to table columns.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Primary Key
+
+Primary key constraints tell that a column or a set of columns of a table are 
unique and they do not contain nulls. Primary key uniquely identify a row in a 
table.
+
+The primary key of a source table is a metadata information for optimization. 
The primary key of a sink table is usually used by the sink implementation for 
upserting.
+
+SQL standard specifies that a constraint can either be ENFORCED or NOT 
ENFORCED. This controls if the constraint checks are performed on the 
incoming/outgoing data. Flink does not own the data therefore the only mode we 
want to support is the NOT ENFORCED mode. Its up to the user to ensure that the 
query enforces key integrity.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN,
+  PRIMARY KEY (MyField1, MyField2) NOT ENFORCED  -- defines a primary key on 
columns
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Time Attributes
+
+Time attributes are essential when working with unbounded streaming tables. 
Therefore both proctime and rowtime attributes can be defined as part of the 
schema.
+

Review comment:
       Remove the therefore.

##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -0,0 +1,291 @@
+---
+title: "HBase SQL Connector"
+nav-title: HBase
+nav-parent_id: sql-connectors
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Bounded</span>
+<span class="label label-primary">Lookup Source: Sync Mode</span>
+<span class="label label-primary">Sink: Batch</span>

Review comment:
       Okey, i didn't notice that :)

##########
File path: docs/dev/table/connectors/index.md
##########
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to register table sources and table sinks in Flink 
using the natively supported connectors. After a source or sink has been 
registered, it can be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own 
*custom* table source or sink, have a look at the [user-defined sources & sinks 
page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces 
a new set of connector options since 1.11.0, if you are using the legacy 
connector options, please refer to the [legacy documentation]({{ site.baseurl 
}}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all 
available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache 
HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can 
define the name of the table, the schema of the table, the connector options 
for connecting to an external system.
+

Review comment:
       `One can defin ...` -> `One can define the table name, the table schema 
and the table options for ...`

##########
File path: docs/dev/table/connectors/index.zh.md
##########
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to register table sources and table sinks in Flink 
using the natively supported connectors. After a source or sink has been 
registered, it can be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own 
*custom* table source or sink, have a look at the [user-defined sources & sinks 
page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces 
a new set of connector options since 1.11.0, if you are using the legacy 
connector options, please refer to the [legacy documentation]({{ site.baseurl 
}}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all 
available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache 
HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can 
define the name of the table, the schema of the table, the connector options 
for connecting to an external system.
+
+The following code shows a full example of how to connect to Kafka for reading 
Json records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts TIMESTAMP,
+  proctime AS PROCTIME(), -- use computed column to define proctime attribute
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to 
define rowtime attribute
+) WITH (
+  -- declare the external system to connect to
+  'connector' = 'kafka',
+  'topic' = 'topic_name',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'   -- declare a format for this system
+)
+{% endhighlight %}
+</div>
+</div>
+
+In this ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
+
+If no factory can be found or multiple factories match for the given 
properties, an exception will be thrown with additional information about 
considered factories and supported properties.
+
+{% top %}
+
+Schema Mapping
+------------
+
+The body clause of a SQL `CREATE TABLE` statement defines the names and types 
of columns, constraints and watermarks. Flink doesn't hold the data, thus the 
schema definition only declares how to map types from an external system to 
Flink’s representation. The mapping may not be mapped by names, it depends on 
the implementation of formats and connectors. For example, a MySQL database 
table is mapped by field names (not case sensitive), and a CSV filesystem is 
mapped by field order (field names can be arbitrary). This will be explained in 
every connectors.
+
+The following example shows a simple schema without time attributes and 
one-to-one field mapping of input/output to table columns.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyTable (
+  MyField1 INT,
+  MyField2 STRING,
+  MyField3 BOOLEAN
+) WITH (
+  ...
+)
+{% endhighlight %}
+</div>
+</div>
+
+### Primary Key
+
+Primary key constraints tell that a column or a set of columns of a table are 
unique and they do not contain nulls. Primary key uniquely identify a row in a 
table.
+

Review comment:
       identify -> identifies.

##########
File path: docs/dev/table/connectors/index.md
##########
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to declare built-in table sources and/or table sinks 
and register them in Flink. After a source or sink has been registered, it can 
be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own 
*custom* table source or sink, have a look at the [user-defined sources & sinks 
page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces 
a new set of connector options since 1.11.0, if you are using the legacy 
connector options, please refer to the [legacy documentation]({{ site.baseurl 
}}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all 
available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache 
HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can 
define the name of the table, the schema of the table, the connector options 
for connecting to an external system.
+
+The following code shows a full example of how to connect to Kafka for reading 
Json records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts TIMESTAMP,
+  proctime AS PROCTIME(), -- use computed column to define proctime attribute
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to 
define rowtime attribute
+) WITH (
+  -- declare the external system to connect to
+  'connector' = 'kafka',
+  'topic' = 'topic_name',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'   -- declare a format for this system
+)
+{% endhighlight %}
+</div>
+</div>
+
+In this ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
+

Review comment:
       `In this ways` -> `In these ways` or `In this way`

##########
File path: docs/dev/table/connectors/index.md
##########
@@ -0,0 +1,268 @@
+---
+title: "Table & SQL Connectors"
+nav-id: sql-connectors
+nav-parent_id: connectors-root
+nav-pos: 2
+nav-show_overview: true
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Avro, Parquet, or ORC.
+
+This page describes how to declare built-in table sources and/or table sinks 
and register them in Flink. After a source or sink has been registered, it can 
be accessed by Table API & SQL statements.
+
+<span class="label label-info">NOTE</span> If you want to implement your own 
*custom* table source or sink, have a look at the [user-defined sources & sinks 
page](sourceSinks.html).
+
+<span class="label label-danger">Attention</span> Flink Table & SQL introduces 
a new set of connector options since 1.11.0, if you are using the legacy 
connector options, please refer to the [legacy documentation]({{ site.baseurl 
}}/dev/table/connect.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Supported Connectors
+------------
+
+Flink natively support various connectors. The following tables list all 
available connectors.
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left">Name</th>
+        <th class="text-center">Version</th>
+        <th class="text-center">Source</th>
+        <th class="text-center">Sink</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td>Filesystem</td>
+      <td></td>
+      <td>Bounded and Unbounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Elasticsearch</td>
+      <td>6.x & 7.x</td>
+      <td>Not supported</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>Apache Kafka</td>
+      <td>0.10+</td>
+      <td>Unbounded Scan</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td>JDBC</td>
+      <td></td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    <tr>
+      <td><a href="{{ site.baseurl }}/dev/table/connectors/hbase.html">Apache 
HBase</a></td>
+      <td>1.4.x</td>
+      <td>Bounded Scan, Lookup</td>
+      <td>Streaming Sink, Batch Sink</td>
+    </tr>
+    </tbody>
+</table>
+
+{% top %}
+
+How to use connectors
+--------
+
+Flink supports to use SQL CREATE TABLE statement to register a table. One can 
define the name of the table, the schema of the table, the connector options 
for connecting to an external system.
+
+The following code shows a full example of how to connect to Kafka for reading 
Json records.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE MyUserTable (
+  -- declare the schema of the table
+  `user` BIGINT,
+  message STRING,
+  ts TIMESTAMP,
+  proctime AS PROCTIME(), -- use computed column to define proctime attribute
+  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- use WATERMARK statement to 
define rowtime attribute
+) WITH (
+  -- declare the external system to connect to
+  'connector' = 'kafka',
+  'topic' = 'topic_name',
+  'scan.startup.mode' = 'earliest-offset',
+  'properties.bootstrap.servers' = 'localhost:9092',
+  'format' = 'json'   -- declare a format for this system
+)
+{% endhighlight %}
+</div>
+</div>
+
+In this ways the desired connection properties are converted into normalized, 
string-based key-value pairs. So-called [table 
factories](sourceSinks.html#define-a-tablefactory) create configured table 
sources, table sinks, and corresponding formats from the key-value pairs. All 
table factories that can be found via Java's [Service Provider Interfaces 
(SPI)](https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) are taken 
into account when searching for exactly-one matching table factory.
+

Review comment:
       To lower case seems better, the `normalized` is confusing.

##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -0,0 +1,291 @@
+---
+title: "HBase SQL Connector"
+nav-title: HBase
+nav-parent_id: sql-connectors
+nav-pos: 9
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Bounded</span>
+<span class="label label-primary">Lookup Source: Sync Mode</span>
+<span class="label label-primary">Sink: Batch</span>
+<span class="label label-primary">Sink: Streaming Upsert Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The HBase connector allows for reading from and writing to an HBase cluster. 
This document describes how to setup the HBase Connector to run SQL queries 
against HBase.
+
+The connector can operate in upsert mode for exchange changelog messages with 
the external system using a primary key defined on the DDL. But the primary key 
can only be defined on the HBase rowkey field. If the PRIMARY KEY clause is not 
declared, the HBase connector will take rowkey as the primary key by default.
+
+<span class="label label-danger">Attention</span> HBase as a Lookup Source 
does not use any caching; data is always queried directly through the HBase 
client.
+
+Dependencies
+------------
+
+In order to setup the HBase connector, the following table provide dependency 
information for both projects using a build automation tool (such as Maven or 
SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+| HBase Version       | Maven dependency                                       
   | SQL Client JAR         |
+| :------------------ | 
:-------------------------------------------------------- | 
:----------------------|
+| 1.4.x               | `flink-connector-hbase{{site.scala_version_suffix}}`   
  | 
[Download](https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-hbase{{site.scala_version_suffix}}/{{site.version}}/flink-connector-hbase{{site.scala_version_suffix}}-{{site.version}}.jar)
 |
+
+{% else %}
+
+The dependency table is only available for stable releases.
+
+{% endif %}
+
+How to create an HBase table
+----------------
+
+All the column families in HBase table must be declared as ROW type, the field 
name maps to the column family name, and the nested field names map to the 
column qualifier names. There is no need to declare all the families and 
qualifiers in the schema, users can declare what’s necessary. Except the ROW 
type fields, the only one field of atomic type (e.g. STRING, BIGINT) will be 
recognized as HBase rowkey. The rowkey field can be arbitrary name.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE hTable (
+ rowkey INT,
+ family1 ROW<q1 INT>,
+ family2 ROW<q2 STRING, q3 BIGINT>,
+ family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
+ PRIMARY KEY (rowkey) NOT ENFORCED
+) WITH (
+ 'connector' = 'hbase-1.4',
+ 'table-name' = 'mytable',
+ 'zookeeper.quorum' = 'localhost:2121'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+----------------
+
+<table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 50%">Description</th>
+      </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use, here should be 'hbase-1.4'.</td>
+    </tr>
+    <tr>
+      <td><h5>table-name</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of HBase table to connect.</td>
+    </tr>
+    <tr>
+      <td><h5>zookeeper.quorum</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The HBase Zookeeper quorum.</td>
+    </tr>
+    <tr>
+      <td><h5>zookeeper.znode.parent</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">/hbase</td>
+      <td>String</td>
+      <td>The root dir in Zookeeper for HBase cluster</td>
+    </tr>
+    <tr>
+      <td><h5>null-string-literal</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">null</td>
+      <td>String</td>
+      <td>Representation for null values for string fields. HBase source and 
sink encodes/decodes empty bytes as null values for all types except string 
type.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.buffer-flush.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">2mb</td>
+      <td>MemorySize</td>
+      <td>Writing option, determines how many size in memory of buffered rows 
to insert per round trip.
+      This can improve performance for writing data to HBase database, but may 
increase the latency.

Review comment:
       The latest change looks good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to