alpinegizmo commented on a change in pull request #13273:
URL: https://github.com/apache/flink/pull/13273#discussion_r481004495



##########
File path: docs/dev/python/user-guide/table/10_minutes_to_table_api.md
##########
@@ -0,0 +1,739 @@
+---
+title: "10 Minutes to Python Table API"
+nav-parent_id: python_tableapi
+nav-pos: 25
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document is a short introduction to PyFlink Table API, which is used to 
help novice users quickly understand the basic usage of PyFlink Table API.
+For advanced usage, please refer to other documents in this User Guide.
+
+* This will be replaced by the TOC
+{:toc}
+
+Common Structure of Python Table API Program 
+--------------------------------------------
+
+All Table API and SQL programs, both batch and streaming, follow the same 
pattern. The following code example shows the common structure of Table API and 
SQL programs.
+
+{% highlight python %}
+
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# 1. create a TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# 2. create source Table
+table_env.execute_sql("""
+    CREATE TABLE datagen (
+        id INT,
+        data STRING
+    ) WITH (
+        'connector' = 'datagen',
+        'fields.id.kind' = 'sequence',
+        'fields.id.start' = '1',
+        'fields.id.end' = '10'
+    )
+""")
+
+# 3. create sink Table
+table_env.execute_sql("""
+    CREATE TABLE print (
+        id INT,
+        data STRING
+    ) WITH (
+        'connector' = 'print'
+    )
+""")
+
+# 4. query from source table and perform caculations
+# create a Table from a Table API query:
+source_table = table_env.from_path("datagen")
+# or create a Table from a SQL query:
+source_table = table_env.sql_query("SELECT * FROM datagen")
+
+result_table = source_table.select("id + 1, data")
+
+# 5. emit query result to sink table
+# emit a Table API result Table to a sink table:
+result_table.execute_insert("print").get_job_client().get_job_execution_result().result()
+# or emit results via SQL query:
+table_env.execute_sql("INSERT INTO print SELECT * FROM 
datagen").get_job_client().get_job_execution_result().result()
+
+{% endhighlight %}
+
+{% top %}
+
+Create a TableEnvironment
+---------------------------
+
+The `TableEnvironment` is a central concept of the Table API and SQL 
integration. The following code example shows how to create a TableEnvironment:
+
+{% highlight python %}
+
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment, 
BatchTableEnvironment
+
+# create a blink streaming TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# create a blink batch TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+# create a flink streaming TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# create a flink batch TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_old_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+{% endhighlight %}
+
+The `TableEnvironment` is responsible for:
+
+* Creating `Table`s
+* Registering `Table`s as a temporary view
+* Executing SQL queries, see [SQL]({% link dev/table/sql/index.md %}) for more 
details
+* Registering user-defined (scalar, table, or aggregation) functions, see 
[General User-defined Functions]({% link 
dev/python/user-guide/table/udfs/python_udfs.md %}) and [Vectorized 
User-defined Functions]({% link 
dev/python/user-guide/table/udfs/vectorized_python_udfs.md %}) for more details
+* Configuring the job, see [Python Configuration]({% link 
dev/python/user-guide/table/python_config.md %}) for more details
+* Managing Python dependencies, see [Dependency Management]({% link 
dev/python/user-guide/table/dependency_management.md %}) for more details
+* Submitting the jobs for execution
+
+Currently there are 2 planners available: flink planner and blink planner.
+
+You should explicitly set which planner to use in the current program.
+We recommend using the blink planner as much as possible. 
+
+{% top %}
+
+Create Tables
+---------------
+
+`Table` is a core component of the Python Table API. A `Table` is a logical 
representation of the intermediate result of a Table API Job.
+
+A `Table` is always bound to a specific `TableEnvironment`. It is not possible 
to combine tables from different TableEnvironments in same query, e.g., to join 
or union them.
+
+### Create using a List Object
+
+You can create a Table from a list object:
+
+{% highlight python %}
+
+# create a blink batch TableEnvironment
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   _1     _2
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+You can also create the Table with specified column names:
+
+{% highlight python %}
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id   data
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+By default the table schema is extracted from the data automatically. 
+
+If the table schema is not as your wish, you can specify it manually:

Review comment:
       ```suggestion
   If the automatically generated table schema isn't satisfactory, you can 
specify it manually:
   ```
   
   

##########
File path: docs/dev/python/user-guide/table/10_minutes_to_table_api.md
##########
@@ -0,0 +1,739 @@
+---
+title: "10 Minutes to Python Table API"

Review comment:
       ```suggestion
   title: "Intro to the Python Table API"
   ```

##########
File path: docs/dev/python/user-guide/table/10_minutes_to_table_api.md
##########
@@ -0,0 +1,739 @@
+---
+title: "10 Minutes to Python Table API"
+nav-parent_id: python_tableapi
+nav-pos: 25
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document is a short introduction to PyFlink Table API, which is used to 
help novice users quickly understand the basic usage of PyFlink Table API.

Review comment:
       ```suggestion
   This document is a short introduction to the PyFlink Table API, which is 
used to help novice users quickly understand the basic usage of PyFlink Table 
API.
   ```

##########
File path: docs/dev/python/user-guide/table/10_minutes_to_table_api.md
##########
@@ -0,0 +1,739 @@
+---
+title: "10 Minutes to Python Table API"
+nav-parent_id: python_tableapi
+nav-pos: 25
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document is a short introduction to PyFlink Table API, which is used to 
help novice users quickly understand the basic usage of PyFlink Table API.
+For advanced usage, please refer to other documents in this User Guide.
+
+* This will be replaced by the TOC
+{:toc}
+
+Common Structure of Python Table API Program 
+--------------------------------------------
+
+All Table API and SQL programs, both batch and streaming, follow the same 
pattern. The following code example shows the common structure of Table API and 
SQL programs.
+
+{% highlight python %}
+
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# 1. create a TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# 2. create source Table
+table_env.execute_sql("""
+    CREATE TABLE datagen (
+        id INT,
+        data STRING
+    ) WITH (
+        'connector' = 'datagen',
+        'fields.id.kind' = 'sequence',
+        'fields.id.start' = '1',
+        'fields.id.end' = '10'
+    )
+""")
+
+# 3. create sink Table
+table_env.execute_sql("""
+    CREATE TABLE print (
+        id INT,
+        data STRING
+    ) WITH (
+        'connector' = 'print'
+    )
+""")
+
+# 4. query from source table and perform caculations
+# create a Table from a Table API query:
+source_table = table_env.from_path("datagen")
+# or create a Table from a SQL query:
+source_table = table_env.sql_query("SELECT * FROM datagen")
+
+result_table = source_table.select("id + 1, data")
+
+# 5. emit query result to sink table
+# emit a Table API result Table to a sink table:
+result_table.execute_insert("print").get_job_client().get_job_execution_result().result()
+# or emit results via SQL query:
+table_env.execute_sql("INSERT INTO print SELECT * FROM 
datagen").get_job_client().get_job_execution_result().result()
+
+{% endhighlight %}
+
+{% top %}
+
+Create a TableEnvironment
+---------------------------
+
+The `TableEnvironment` is a central concept of the Table API and SQL 
integration. The following code example shows how to create a TableEnvironment:
+
+{% highlight python %}
+
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment, 
BatchTableEnvironment
+
+# create a blink streaming TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# create a blink batch TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+# create a flink streaming TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# create a flink batch TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_old_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+{% endhighlight %}
+
+The `TableEnvironment` is responsible for:
+
+* Creating `Table`s
+* Registering `Table`s as a temporary view
+* Executing SQL queries, see [SQL]({% link dev/table/sql/index.md %}) for more 
details
+* Registering user-defined (scalar, table, or aggregation) functions, see 
[General User-defined Functions]({% link 
dev/python/user-guide/table/udfs/python_udfs.md %}) and [Vectorized 
User-defined Functions]({% link 
dev/python/user-guide/table/udfs/vectorized_python_udfs.md %}) for more details
+* Configuring the job, see [Python Configuration]({% link 
dev/python/user-guide/table/python_config.md %}) for more details
+* Managing Python dependencies, see [Dependency Management]({% link 
dev/python/user-guide/table/dependency_management.md %}) for more details
+* Submitting the jobs for execution
+
+Currently there are 2 planners available: flink planner and blink planner.
+
+You should explicitly set which planner to use in the current program.
+We recommend using the blink planner as much as possible. 
+
+{% top %}
+
+Create Tables
+---------------
+
+`Table` is a core component of the Python Table API. A `Table` is a logical 
representation of the intermediate result of a Table API Job.
+
+A `Table` is always bound to a specific `TableEnvironment`. It is not possible 
to combine tables from different TableEnvironments in same query, e.g., to join 
or union them.
+
+### Create using a List Object
+
+You can create a Table from a list object:
+
+{% highlight python %}
+
+# create a blink batch TableEnvironment
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   _1     _2
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+You can also create the Table with specified column names:
+
+{% highlight python %}
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id   data
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+By default the table schema is extracted from the data automatically. 
+
+If the table schema is not as your wish, you can specify it manually:
+
+{% highlight python %}
+
+table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], 
['id', 'data'])
+# by default the type of the "id" column is 64 bit int
+default_type = table_without_schema.to_pandas()["id"].dtype
+print('By default the type of the "id" column is %s.' % default_type)
+
+from pyflink.table import DataTypes
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
+                                DataTypes.ROW([DataTypes.FIELD("id", 
DataTypes.TINYINT()),
+                                               DataTypes.FIELD("data", 
DataTypes.STRING())]))
+# now the type of the "id" column is 8 bit int
+type = table.to_pandas()["id"].dtype
+print('Now the type of the "id" column is %s.' % type)
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+By default the type of the "id" column is int64.
+Now the type of the "id" column is int8.
+{% endhighlight %}
+
+### Create using a Connector
+
+You can create a Table using connector DDL:
+
+{% highlight python %}
+# create a blink stream TableEnvironment
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+table_env.execute_sql("""
+    CREATE TABLE random_source (
+        id BIGINT, 
+        data TINYINT 
+    ) WITH (
+        'connector' = 'datagen',
+        'fields.id.kind'='sequence',
+        'fields.id.start'='1',
+        'fields.id.end'='3',
+        'fields.data.kind'='sequence',
+        'fields.data.start'='4',
+        'fields.data.end'='6'
+    )
+""")
+table = table_env.from_path("random_source")
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id  data
+0   2     5
+1   1     4
+2   3     6
+{% endhighlight %}
+
+### Create using a Catalog
+
+A `TableEnvironment` maintains a map of catalogs of tables which are created 
with an identifier.
+
+The tables in a catalog may either be temporary, and tied to the lifecycle of 
a single Flink session, or permanent, and visible across multiple Flink 
sessions.
+
+The tables and views created via SQL DDL, e.g. "create table ..." and "create 
view ..." are also stored in a catalog.
+
+You can directly access the tables in a catalog via SQL.
+
+If you want to use tables from a catalog with the Table API, you can use the 
"from_path" method to create the Table API objects:
+
+{% highlight python %}
+
+# prepare the catalog
+# register Table API tables in the catalog
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+table_env.create_temporary_view('source_table', table)
+
+# create Table API table from catalog
+new_table = table_env.from_path('source_table')
+new_table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id   data
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+{% top %}
+
+Write Queries
+---------------
+
+### Write Table API Queries
+
+The `Table` object offers many methods for applying relational operations. 
+These methods return new `Table` objects representing the result of applying 
the relational operations on the input `Table`. 
+These relational operations may be composed of multiple method calls, such as 
`table.group_by(...).select(...)`.
+
+The [Table API]({% link dev/table/tableApi.md %}?code_tab=python) 
documentation describes all Table API operations that are supported on 
streaming and batch tables.
+
+The following example shows a simple Table API aggregation query:
+
+{% highlight python %}
+
+# using batch table environment to execute the queries
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 
30), ('Jack', 'FRANCE', 20)],
+                                 ['name', 'country', 'revenue'])
+# compute revenue for all customers from France
+revenue = orders \
+    .select("name, country, revenue") \
+    .where("country === 'FRANCE'") \
+    .group_by("name") \
+    .select("name, revenue.sum AS rev_sum")
+    
+revenue.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   name  rev_sum
+0  Jack       30
+{% endhighlight %}
+
+### Write SQL Queries
+
+Flink's SQL integration is based on [Apache 
Calcite](https://calcite.apache.org), which implements the SQL standard. SQL 
queries are specified as Strings.
+
+The [SQL]({% link dev/table/sql/index.md %}) documentation describes Flink's 
SQL support for streaming and batch tables.
+
+The following example shows a simple SQL aggregation query:
+
+{% highlight python %}
+
+# use a StreamTableEnvironment to execute the queries
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+
+table_env.execute_sql("""
+    CREATE TABLE random_source (
+        id BIGINT, 
+        data TINYINT
+    ) WITH (
+        'connector' = 'datagen',
+        'fields.id.kind'='sequence',
+        'fields.id.start'='1',
+        'fields.id.end'='8',
+        'fields.data.kind'='sequence',
+        'fields.data.start'='4',
+        'fields.data.end'='11'
+    )
+""")
+
+table_env.execute_sql("""
+    CREATE TABLE print_sink (
+        id BIGINT, 
+        data_sum TINYINT 
+    ) WITH (
+        'connector' = 'print'
+    )
+""")
+
+table_env.execute_sql("""
+    INSERT INTO print_sink
+        SELECT id, sum(data) as data_sum FROM 
+            (SELECT id / 2 as id, data FROM random_source)
+        WHERE id > 1
+        GROUP BY id
+""").get_job_client().get_job_execution_result().result()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+2> +I(4,11)
+6> +I(2,8)
+8> +I(3,10)
+6> -U(2,8)
+8> -U(3,10)
+6> +U(2,15)
+8> +U(3,19)
+{% endhighlight %}
+
+In fact, this shows the change logs received by the print sink.
+The output format of a change log is:
+{% highlight python %}

Review comment:
       ```suggestion
   {% highlight text %}
   ```
   
   Highlighting this as python creates distracting artifacts in the rendered 
HTML.

##########
File path: docs/dev/python/user-guide/table/10_minutes_to_table_api.md
##########
@@ -0,0 +1,739 @@
+---
+title: "10 Minutes to Python Table API"
+nav-parent_id: python_tableapi
+nav-pos: 25
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+This document is a short introduction to PyFlink Table API, which is used to 
help novice users quickly understand the basic usage of PyFlink Table API.
+For advanced usage, please refer to other documents in this User Guide.
+
+* This will be replaced by the TOC
+{:toc}
+
+Common Structure of Python Table API Program 
+--------------------------------------------
+
+All Table API and SQL programs, both batch and streaming, follow the same 
pattern. The following code example shows the common structure of Table API and 
SQL programs.
+
+{% highlight python %}
+
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+# 1. create a TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# 2. create source Table
+table_env.execute_sql("""
+    CREATE TABLE datagen (
+        id INT,
+        data STRING
+    ) WITH (
+        'connector' = 'datagen',
+        'fields.id.kind' = 'sequence',
+        'fields.id.start' = '1',
+        'fields.id.end' = '10'
+    )
+""")
+
+# 3. create sink Table
+table_env.execute_sql("""
+    CREATE TABLE print (
+        id INT,
+        data STRING
+    ) WITH (
+        'connector' = 'print'
+    )
+""")
+
+# 4. query from source table and perform caculations
+# create a Table from a Table API query:
+source_table = table_env.from_path("datagen")
+# or create a Table from a SQL query:
+source_table = table_env.sql_query("SELECT * FROM datagen")
+
+result_table = source_table.select("id + 1, data")
+
+# 5. emit query result to sink table
+# emit a Table API result Table to a sink table:
+result_table.execute_insert("print").get_job_client().get_job_execution_result().result()
+# or emit results via SQL query:
+table_env.execute_sql("INSERT INTO print SELECT * FROM 
datagen").get_job_client().get_job_execution_result().result()
+
+{% endhighlight %}
+
+{% top %}
+
+Create a TableEnvironment
+---------------------------
+
+The `TableEnvironment` is a central concept of the Table API and SQL 
integration. The following code example shows how to create a TableEnvironment:
+
+{% highlight python %}
+
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment, 
BatchTableEnvironment
+
+# create a blink streaming TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# create a blink batch TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+# create a flink streaming TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_old_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+# create a flink batch TableEnvironment
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_old_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+{% endhighlight %}
+
+The `TableEnvironment` is responsible for:
+
+* Creating `Table`s
+* Registering `Table`s as a temporary view
+* Executing SQL queries, see [SQL]({% link dev/table/sql/index.md %}) for more 
details
+* Registering user-defined (scalar, table, or aggregation) functions, see 
[General User-defined Functions]({% link 
dev/python/user-guide/table/udfs/python_udfs.md %}) and [Vectorized 
User-defined Functions]({% link 
dev/python/user-guide/table/udfs/vectorized_python_udfs.md %}) for more details
+* Configuring the job, see [Python Configuration]({% link 
dev/python/user-guide/table/python_config.md %}) for more details
+* Managing Python dependencies, see [Dependency Management]({% link 
dev/python/user-guide/table/dependency_management.md %}) for more details
+* Submitting the jobs for execution
+
+Currently there are 2 planners available: flink planner and blink planner.
+
+You should explicitly set which planner to use in the current program.
+We recommend using the blink planner as much as possible. 
+
+{% top %}
+
+Create Tables
+---------------
+
+`Table` is a core component of the Python Table API. A `Table` is a logical 
representation of the intermediate result of a Table API Job.
+
+A `Table` is always bound to a specific `TableEnvironment`. It is not possible 
to combine tables from different TableEnvironments in same query, e.g., to join 
or union them.
+
+### Create using a List Object
+
+You can create a Table from a list object:
+
+{% highlight python %}
+
+# create a blink batch TableEnvironment
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   _1     _2
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+You can also create the Table with specified column names:
+
+{% highlight python %}
+
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id   data
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+By default the table schema is extracted from the data automatically. 
+
+If the table schema is not as your wish, you can specify it manually:
+
+{% highlight python %}
+
+table_without_schema = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], 
['id', 'data'])
+# by default the type of the "id" column is 64 bit int
+default_type = table_without_schema.to_pandas()["id"].dtype
+print('By default the type of the "id" column is %s.' % default_type)
+
+from pyflink.table import DataTypes
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
+                                DataTypes.ROW([DataTypes.FIELD("id", 
DataTypes.TINYINT()),
+                                               DataTypes.FIELD("data", 
DataTypes.STRING())]))
+# now the type of the "id" column is 8 bit int
+type = table.to_pandas()["id"].dtype
+print('Now the type of the "id" column is %s.' % type)
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+By default the type of the "id" column is int64.
+Now the type of the "id" column is int8.
+{% endhighlight %}
+
+### Create using a Connector
+
+You can create a Table using connector DDL:
+
+{% highlight python %}
+# create a blink stream TableEnvironment
+from pyflink.table import EnvironmentSettings, StreamTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
+table_env = StreamTableEnvironment.create(environment_settings=env_settings)
+
+table_env.execute_sql("""
+    CREATE TABLE random_source (
+        id BIGINT, 
+        data TINYINT 
+    ) WITH (
+        'connector' = 'datagen',
+        'fields.id.kind'='sequence',
+        'fields.id.start'='1',
+        'fields.id.end'='3',
+        'fields.data.kind'='sequence',
+        'fields.data.start'='4',
+        'fields.data.end'='6'
+    )
+""")
+table = table_env.from_path("random_source")
+table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id  data
+0   2     5
+1   1     4
+2   3     6
+{% endhighlight %}
+
+### Create using a Catalog
+
+A `TableEnvironment` maintains a map of catalogs of tables which are created 
with an identifier.
+
+The tables in a catalog may either be temporary, and tied to the lifecycle of 
a single Flink session, or permanent, and visible across multiple Flink 
sessions.
+
+The tables and views created via SQL DDL, e.g. "create table ..." and "create 
view ..." are also stored in a catalog.
+
+You can directly access the tables in a catalog via SQL.
+
+If you want to use tables from a catalog with the Table API, you can use the 
"from_path" method to create the Table API objects:
+
+{% highlight python %}
+
+# prepare the catalog
+# register Table API tables in the catalog
+table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
+table_env.create_temporary_view('source_table', table)
+
+# create Table API table from catalog
+new_table = table_env.from_path('source_table')
+new_table.to_pandas()
+
+{% endhighlight %}
+
+The result is:
+
+{% highlight text %}
+   id   data
+0   1     Hi
+1   2  Hello
+{% endhighlight %}
+
+{% top %}
+
+Write Queries
+---------------
+
+### Write Table API Queries
+
+The `Table` object offers many methods for applying relational operations. 
+These methods return new `Table` objects representing the result of applying 
the relational operations on the input `Table`. 
+These relational operations may be composed of multiple method calls, such as 
`table.group_by(...).select(...)`.
+
+The [Table API]({% link dev/table/tableApi.md %}?code_tab=python) 
documentation describes all Table API operations that are supported on 
streaming and batch tables.
+
+The following example shows a simple Table API aggregation query:
+
+{% highlight python %}
+
+# using batch table environment to execute the queries
+from pyflink.table import EnvironmentSettings, BatchTableEnvironment
+
+env_settings = 
EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build()
+table_env = BatchTableEnvironment.create(environment_settings=env_settings)
+
+orders = table_env.from_elements([('Jack', 'FRANCE', 10), ('Rose', 'ENGLAND', 
30), ('Jack', 'FRANCE', 20)],
+                                 ['name', 'country', 'revenue'])
+# compute revenue for all customers from France

Review comment:
       ```suggestion
   
   # compute revenue for all customers from France
   ```
   
   A blank line will improve readability here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to