luoyuxia commented on code in PR #22837:
URL: https://github.com/apache/flink/pull/22837#discussion_r1241802885


##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -50,52 +50,477 @@ Due to the declarative nature of Table API & SQL programs, 
it is not always obvi
 state is used within a pipeline. The planner decides whether state is 
necessary to compute a correct
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
-
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
+#### Stateful Operators
 
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="60%">}}
 
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog without 
*UPDATE_BEFORE*, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="60%">}}
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a `word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+The minimum configurable granularity is defined as the number of incoming 
input edges for each state operator. 
+Specifically, `OneInputStreamOperator` can configure the TTL for one state, 
while `TwoInputStreamOperator` (such as regular join), which has two inputs, 
can configure the TTL for the left and right states separately. 
+More generally, for `MultipleInputStreamOperator` which has K inputs, K state 
TTLs can be configured.
+
+Typical use cases are as follows. 

Review Comment:
   nit:
   ```suggestion
   Typical use cases are as follows:
   ```



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -50,52 +50,477 @@ Due to the declarative nature of Table API & SQL programs, 
it is not always obvi
 state is used within a pipeline. The planner decides whether state is 
necessary to compute a correct
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
-
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
+#### Stateful Operators
 
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="60%">}}
 
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog without 
*UPDATE_BEFORE*, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="60%">}}
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a `word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+The minimum configurable granularity is defined as the number of incoming 
input edges for each state operator. 
+Specifically, `OneInputStreamOperator` can configure the TTL for one state, 
while `TwoInputStreamOperator` (such as regular join), which has two inputs, 
can configure the TTL for the left and right states separately. 
+More generally, for `MultipleInputStreamOperator` which has K inputs, K state 
TTLs can be configured.
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left state to keep left 
input and right state to keep right input. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
states. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+{{< hint info >}}
+Window-based operations (like [Window Join]({{< ref 
"docs/dev/table/sql/queries/window-join" >}}), [Window Aggregation]({{< ref 
"docs/dev/table/sql/queries/window-agg" >}}), [Window Top-N]({{< ref 
"docs/dev/table/sql/queries/window-topn" >}}) *etc.*) and [Interval Joins]({{< 
ref "docs/dev/table/sql/queries/joins" >}}#interval-joins) do not rely on 
`table.exec.state.ttl` to control the state retention, and their states cannot 
be configured at operator-level.
+
+{{< /hint >}}
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    This will generate a JSON file at `/path/to/plan.json`.
+
+{{< hint info >}}
+`COMPILE PLAN` statement supports writing the plan to a remote 
[filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like 
`hdfs://` or `s3://`. 
+Please be sure that the target path has set up the write access.
+{{< /hint >}}
+
+**Modify the Compiled Plan**
+
+Every operator that uses state will explicitly generate a JSON array named 
"state" with the following structure. 
+Theoretically, A k-th input stream operator will have k-th state.
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+Locate the operator for which you need to set TTL, modify the TTL to a 
positive integer (note that the time unit is milliseconds), 
+save the file, and then use the `EXECUTE PLAN` statement to submit your job.
+
+
+{{< hint info >}}
+Conceptually, the TTL of downstream stateful operator should be greater than 
or equal to the TTL of upstream stateful operator.
+{{< /hint >}}
+
+**Execute the Compiled Plan**
+
+`EXECUTE PLAN` statement will deserialize the specified file back to execution 
plan of the current table program and then submit the job.
+The job submitted via `EXECUTE PLAN` statement will apply state TTL read from 
the file, instead of the configuration `table.exec.state.ttl`.
+
+- Run an `EXECUTE PLAN` statement
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL Syntax
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file_path>;
+    ```
+    This will deserialize the JSON file and submit an insert statement job.
+
+**A Full Example**
+
+The following table program computes the enriched order shipment information. 
+It performs a regular inner join with different state TTL for left and right 
side.
+
+- Generate compiled plan
+    ```sql
+    -- left source table
+    CREATE TABLE Orders (
+        `order_id` INT,
+        `line_order_id` INT
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- right source table
+    CREATE TABLE LineOrders (
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- sink table
+    CREATE TABLE OrdersShipInfo (
+        `order_id` INT,
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector' = '...'
+    );
+    
+    COMPILE PLAN '/path/to/plan.json' FOR
+    INSERT INTO OrdersShipInfo
+    SELECT a.order_id, a.line_order_id, b.ship_mode 
+    FROM Orders a JOIN LineOrders b 
+        ON a.line_order_id = b.line_order_id;
+    ```
+    The generated JSON file has the following contents.

Review Comment:
   nit
   ```suggestion
       The generated JSON file has the following contents:
   ```
   ?



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -50,52 +50,477 @@ Due to the declarative nature of Table API & SQL programs, 
it is not always obvi
 state is used within a pipeline. The planner decides whether state is 
necessary to compute a correct
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
-
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
+#### Stateful Operators
 
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="60%">}}
 
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog without 
*UPDATE_BEFORE*, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="60%">}}
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a `word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+The minimum configurable granularity is defined as the number of incoming 
input edges for each state operator. 
+Specifically, `OneInputStreamOperator` can configure the TTL for one state, 
while `TwoInputStreamOperator` (such as regular join), which has two inputs, 
can configure the TTL for the left and right states separately. 
+More generally, for `MultipleInputStreamOperator` which has K inputs, K state 
TTLs can be configured.
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left state to keep left 
input and right state to keep right input. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
states. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+{{< hint info >}}
+Window-based operations (like [Window Join]({{< ref 
"docs/dev/table/sql/queries/window-join" >}}), [Window Aggregation]({{< ref 
"docs/dev/table/sql/queries/window-agg" >}}), [Window Top-N]({{< ref 
"docs/dev/table/sql/queries/window-topn" >}}) *etc.*) and [Interval Joins]({{< 
ref "docs/dev/table/sql/queries/joins" >}}#interval-joins) do not rely on 
`table.exec.state.ttl` to control the state retention, and their states cannot 
be configured at operator-level.
+
+{{< /hint >}}
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    This will generate a JSON file at `/path/to/plan.json`.
+
+{{< hint info >}}
+`COMPILE PLAN` statement supports writing the plan to a remote 
[filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like 
`hdfs://` or `s3://`. 
+Please be sure that the target path has set up the write access.
+{{< /hint >}}
+
+**Modify the Compiled Plan**
+
+Every operator that uses state will explicitly generate a JSON array named 
"state" with the following structure. 
+Theoretically, A k-th input stream operator will have k-th state.
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+Locate the operator for which you need to set TTL, modify the TTL to a 
positive integer (note that the time unit is milliseconds), 
+save the file, and then use the `EXECUTE PLAN` statement to submit your job.
+
+
+{{< hint info >}}
+Conceptually, the TTL of downstream stateful operator should be greater than 
or equal to the TTL of upstream stateful operator.
+{{< /hint >}}
+
+**Execute the Compiled Plan**
+
+`EXECUTE PLAN` statement will deserialize the specified file back to execution 
plan of the current table program and then submit the job.
+The job submitted via `EXECUTE PLAN` statement will apply state TTL read from 
the file, instead of the configuration `table.exec.state.ttl`.
+
+- Run an `EXECUTE PLAN` statement
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL Syntax
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file_path>;
+    ```
+    This will deserialize the JSON file and submit an insert statement job.
+
+**A Full Example**
+
+The following table program computes the enriched order shipment information. 
+It performs a regular inner join with different state TTL for left and right 
side.
+
+- Generate compiled plan
+    ```sql
+    -- left source table
+    CREATE TABLE Orders (
+        `order_id` INT,
+        `line_order_id` INT
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- right source table
+    CREATE TABLE LineOrders (
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- sink table
+    CREATE TABLE OrdersShipInfo (
+        `order_id` INT,
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector' = '...'
+    );
+    
+    COMPILE PLAN '/path/to/plan.json' FOR
+    INSERT INTO OrdersShipInfo
+    SELECT a.order_id, a.line_order_id, b.ship_mode 
+    FROM Orders a JOIN LineOrders b 
+        ON a.line_order_id = b.line_order_id;
+    ```
+    The generated JSON file has the following contents.
+
+    ```json
+    {
+      "flinkVersion" : "1.18",
+      "nodes" : [ {
+        "id" : 1,
+        "type" : "stream-exec-table-source-scan_1",
+        "scanTableSource" : {
+          "table" : {
+            "identifier" : "`default_catalog`.`default_database`.`Orders`",
+            "resolvedTable" : { ... }
+          }
+        },
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
+        "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[order_id, line_order_id])",
+        "inputProperties" : [ ]
+      }, {
+        "id" : 2,
+        "type" : "stream-exec-exchange_1",
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
+        "description" : "Exchange(distribution=[hash[line_order_id]])"
+      }, {
+        "id" : 3,
+        "type" : "stream-exec-table-source-scan_1",
+        "scanTableSource" : {
+          "table" : {
+            "identifier" : "`default_catalog`.`default_database`.`LineOrders`",
+            "resolvedTable" : {...}
+          }
+        },
+        "outputType" : "ROW<`line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : "TableSourceScan(table=[[default_catalog, 
default_database, LineOrders]], fields=[line_order_id, ship_mode])",
+        "inputProperties" : [ ]
+      }, {
+        "id" : 4,
+        "type" : "stream-exec-exchange_1",
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : "Exchange(distribution=[hash[line_order_id]])"
+      }, {
+        "id" : 5,
+        "type" : "stream-exec-join_2",
+        "joinSpec" : { ... },
+        "state" : [ {
+          "index" : 0,
+          "ttl" : "0 ms",
+          "name" : "leftState"
+        }, {
+          "index" : 1,
+          "ttl" : "0 ms",
+          "name" : "rightState"
+        } ],
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, 
`line_order_id0` INT, `ship_mode` VARCHAR(2147483647)>",
+        "description" : "Join(joinType=[InnerJoin], where=[(line_order_id = 
line_order_id0)], select=[order_id, line_order_id, line_order_id0, ship_mode], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])"
+      }, {
+        "id" : 6,
+        "type" : "stream-exec-calc_1",
+        "projection" : [ ... ],
+        "condition" : null,
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : "Calc(select=[order_id, line_order_id, ship_mode])"
+      }, {
+        "id" : 7,
+        "type" : "stream-exec-sink_2",
+        "configuration" : { ... },
+        "dynamicTableSink" : {
+          "table" : {
+            "identifier" : 
"`default_catalog`.`default_database`.`OrdersShipInfo`",
+            "resolvedTable" : { ... }
+          }
+        },
+        "inputChangelogMode" : [ "INSERT" ],
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : 
"Sink(table=[default_catalog.default_database.OrdersShipInfo], 
fields=[order_id, line_order_id, ship_mode])"
+      } ],
+      "edges" : [ ... ]
+    }
+    ```
+
+- Modify the plan content and execute plan
+
+    The JSON representation for join operator's state has the following 
structure.

Review Comment:
   nit
   ```suggestion
       The JSON representation for join operator's state has the following 
structure:
   ```



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -56,37 +56,449 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
+#### 状态算子
 
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})或[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化。由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,因此总状态量会随着 
`word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="60%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)),状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="60%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 

Review Comment:
   nit:
   Seems should be 
   `空闲状态维持时间`?



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -56,37 +56,449 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
+#### 状态算子
 
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})或[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化。由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,因此总状态量会随着 
`word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="60%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)),状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="60%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
+
+通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是对应键的第一条记录。上述例子中意味着 
`cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始,Table API & SQL 支持配置细粒度的状态 TTL 
来优化状态使用。最小可配置粒度为每个状态算子的入边数。具体而言,`OneInputStreamOperator` 可以配置一个状态的 TTL,而 
`TwoInputStreamOperator`(例如双流 join)则可以分别为左状态和右状态配置 TTL。更一般地,对于具有 K 个输入的 
`MultipleInputStreamOperator`,可以配置 K 个状态 TTL。
+
+一些典型的使用场景如下
+- 为 [双流 Join]({{< ref "docs/dev/table/sql/queries/joins" >}}#regular-joins) 
的左右流配置不同 TTL。 
+双流 Join 会生成拥有两条输入边的 `TwoInputStreamOperator` 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
+- 在同一个作业中为不同的状态计算设置不同 TTL。
+举例来说,假设一个 ETL 作业使用 `ROW_NUMBER` 进行[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}})操作后,
+紧接着使用 `GROUP BY` 语句进行[聚合]({{< ref "docs/dev/table/sql/queries/group-agg" 
>}})操作。
+该作业会分别生成两个拥有单条输入边的 `OneInputStreamOperator` 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
+
+{{< hint info >}}
+由于基于窗口的操作(例如[窗口连接]({{< ref "docs/dev/table/sql/queries/window-join" 
>}})、[窗口聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}})、[窗口 Top-N]({{< 
ref "docs/dev/table/sql/queries/window-topn" >}}) 等)和 [Interval Join]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#interval-joins) 不依赖于 
`table.exec.state.ttl` 来控制状态保留,因此它们的状态无法在算子级别进行配置。
+{{< /hint >}}
+
+**生成 Compiled Plan**
+
+配置过程首先会使用 `COMPILE PLAN` 语句生成一个 JSON 文件,它表示了序列化后的执行计划。
+{{< hint info >}}
+`COMPILE PLAN` 不支持查询语句 `SELECT... FROM...` 。 
+{{< /hint >}}
+
+- 执行 `COMPILE PLAN` 语句
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
 
-通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL 语法
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。
+
+{{< hint info >}}
+`COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref 
"docs/deployment/filesystems/overview" >}})。
+请确保已为目标写入路径设置了写入权限。
+{{< /hint >}}
+
+**修改 Compiled Plan**
+
+每个状态算子会显式地生成一个名为 "state" 的 JSON 数组,具有如下结构。
+理论上一个拥有 k 路输入的状态算子拥有 k 个状态。
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+找到您需要修改的状态算子,将 TTL 设置为一个正整数(注意时间单位是毫秒),保存好文件,然后使用 `EXECUTE PLAN` 语句来提交作业。
+
+
+{{< hint info >}}
+理论上,下游状态算子的 TTL 不应小于上游状态算子的 TTL。
+{{< /hint >}}
+
+**执行 Compiled Plan**
+
+`EXECUTE PLAN` 语句将会反序列化上述 JSON 文件,进一步生成 JobGraph 并提交作业。
+通过 `EXECUTE PLAN` 语句提交的作业,其状态算子的 TTL 的值将会从文件中读取,配置项 `table.exec.state.ttl` 
的值将会被忽略。
+
+- 执行 `EXECUTE PLAN` 语句
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
 
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL 语法
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file_path>;
+    ```
+    该语句反序列化指定的 JSON 文件,并提交作业。
+
+**完整示例**
+
+下面的例子展示了一个通过双流 Join 计算订单明细的作业,并且如何为左右流设置不同的 TTL。
+
+- 生成 compiled plan
+    ```sql
+    -- left source table
+    CREATE TABLE Orders (
+        `order_id` INT,
+        `line_order_id` INT
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- right source table
+    CREATE TABLE LineOrders (
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- sink table
+    CREATE TABLE OrdersShipInfo (
+        `order_id` INT,
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector' = '...'
+    );
+    
+    COMPILE PLAN '/path/to/plan.json' FOR
+    INSERT INTO OrdersShipInfo
+    SELECT a.order_id, a.line_order_id, b.ship_mode 
+    FROM Orders a JOIN LineOrders b 
+        ON a.line_order_id = b.line_order_id;
+    ```
+    生成的 JSON 文件内容如下。

Review Comment:
   nit
   ```suggestion
       生成的 JSON 文件内容如下:
   ```



##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 
+
+Typical use cases are as follows. 
+- Set different TTLs for [regular joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#regular-joins). 
+Regular join generates a `TwoInputStreamOperator` with left states to keep 
left inputs and right states to keep right inputs. From Flink v1.18,
+you can set the different state TTL for left state and right state. 
+- Set different TTLs for different transformations within one pipeline.
+For example, there is an ETL pipeline which uses `ROW_NUMBER` to perform 
[deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}),
+and then use `GROUP BY` to perform [aggregation]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}). 
+This table program will generate two `OneInputStreamOperator`s with their own 
state. 
+Now you can set different state TTL for deduplicate state and aggregate state.
+
+**Generate a Compiled Plan**
+
+The setup process begins by generating a JSON file using the `COMPILE PLAN` 
statement, 
+which represents the serialized execution plan of the current table program. 
+{{< hint info >}}
+Currently, `COMPILE PLAN` statement does not support `SELECT... FROM...` 
queries. 
+{{< /hint >}}
+
+- Run a `COMPILE PLAN` statement
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL Syntax
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    This will generate a JSON file at `/path/to/plan.json`.
+
+{{< hint info >}}
+`COMPILE PLAN` statement supports writing the plan to a remote 
[filesystem]({{< ref "docs/deployment/filesystems/overview" >}}) scheme like 
`hdfs://` or `s3://`. 
+Please be sure that the target path has set up the write access.
+{{< /hint >}}
+
+**Modify the Compiled Plan**
+
+Every operator that uses state will explicitly generate a JSON array named 
"state" with the following structure. 
+Theoretically, A k-th input stream operator will have k-th state.
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+Locate the operator for which you need to set TTL, modify the TTL to a 
positive integer (note that the time unit is milliseconds), 

Review Comment:
   Got it. But it may make user confused since we said `modify the TTL to a 
positive integer(note that the time unit is milliseconds)`.
   I'm afriad of that user may mistake it as change `ttl` to "ttl": "1000". 
AFAIF, it does be a positive integer and as said the time unit is milliseconds, 
I will be intend to think we can emit time unit.



##########
docs/content.zh/docs/dev/table/concepts/overview.md:
##########
@@ -56,37 +56,449 @@ Flink 的 [Table API]({{< ref "docs/dev/table/tableApi" >}}) 
和 [SQL]({{< ref "
 它们的状态取决于用到的操作。
 {{< /hint >}}
 
-形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询的查询语句通常是无状态的管道。 然而诸如 join、
-聚合或去重操作需要在 Flink 抽象的容错存储内保存中间结果。
+#### 状态算子
 
-{{< hint info >}}
-请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
-{{< /hint >}}
+包含诸如[连接]({{< ref "docs/dev/table/sql/queries/joins" >}})、[聚合]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}})或[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}}) 等操作的语句需要在 Flink 抽象的容错存储内保存中间结果。
 
 例如对两个表进行 join 操作的普通 SQL 需要算子保存两个表的全部输入。基于正确的 SQL 语义,运行时假设两表会在任意时间点进行匹配。
 Flink 提供了 [优化窗口和时段 Join 聚合]({{< ref "docs/dev/table/sql/queries/joins" >}}) 
 以利用 [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}) 
概念来让保持较小的状态规模。
 
-另一个计算每个会话的点击次数的查询语句的例子如下
+另一个计算词频的例子如下
+
+```sql
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
+```
+
+`word` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `word` 次数。
+输入 `word` 的值随时间变化。由于这个查询一直持续,Flink 会为每个 `word` 维护一个中间状态来保存当前词频,因此总状态量会随着 
`word` 的发现不断地增长。
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="60%">}}
 
+形如 `SELECT ... FROM ... WHERE` 这种只包含字段映射或过滤器的查询语句通常是无状态的管道。
+然而在某些情况下,根据输入数据的特征(比如输入表是不带 *UPDATE_BEFORE* 的更新流,参考
+[表到流的转换]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion))或配置(参考 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)),状态算子可能会被隐式地推导出来。
+
+下面的例子展示了使用 `SELECT ... FROM` 语句查询 [upsert kafka 源表]({{< ref 
"docs/connectors/table/upsert-kafka" >}})。
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
 ```
+源表的消息类型只包含 *INSERT*,*UPDATE_AFTER* 和 *DELETE*,然而下游要求完整的 changelog(包含 
*UPDATE_BEFORE*)。
+所以虽然查询本身没有包含状态计算,但是优化器依然隐式地推导出了一个 ChangelogNormalize 状态算子来生成完整的 changelog。
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="60%">}}
 
-`sessionId` 是用于分组的键,连续查询(Continuous Query)维护了每个观察到的 `sessionId` 次数。 
`sessionId` 属性随着时间逐步演变,
-且 `sessionId` 的值只活跃到会话结束(即在有限的时间周期内)。然而连续查询无法得知sessionId的这个性质,
-并且预期每个 `sessionId` 值会在任何时间点上出现。这维护了每个可见的 `sessionId` 值。因此总状态量会随着 `sessionId` 
的发现不断地增长。
+{{< hint info >}}
+请参考独立的算子文档来获取更多关于状态需求量和限制潜在增长状态大小的信息。
+{{< /hint >}}
 
 <a name="idle-state-retention-time"></a>
 
 #### 空闲状态维持时间
 
 *空间状态位置时间*参数 [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl) 
-定义了状态的键在被更新后要保持多长时间才被移除。在之前的查询例子中,`sessionId` 的数目会在配置的时间内未更新时立刻被移除。
+定义了状态的键在被更新后要保持多长时间才被移除。
+在之前的查询例子中,`word` 的数目会在配置的时间内未更新时立刻被移除。
+
+通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是对应键的第一条记录。上述例子中意味着 
`cnt` 会再次从 `0` 开始计数。
+
+#### 配置算子粒度的状态 TTL
+--------------------------
+{{< hint warning >}}
+这是一个需要小心使用的高级特性。该特性仅适用于作业中使用了多个状态,并且每个状态需要使用不同的 TTL。
+无状态的作业不需要参考下面的操作步骤。
+如果作业中仅使用到一个状态,仅需设置作业级别的 TTL 参数 [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)。
+
+{{< /hint >}}
+
+从 Flink v1.18 开始,Table API & SQL 支持配置细粒度的状态 TTL 
来优化状态使用。最小可配置粒度为每个状态算子的入边数。具体而言,`OneInputStreamOperator` 可以配置一个状态的 TTL,而 
`TwoInputStreamOperator`(例如双流 join)则可以分别为左状态和右状态配置 TTL。更一般地,对于具有 K 个输入的 
`MultipleInputStreamOperator`,可以配置 K 个状态 TTL。
+
+一些典型的使用场景如下
+- 为 [双流 Join]({{< ref "docs/dev/table/sql/queries/joins" >}}#regular-joins) 
的左右流配置不同 TTL。 
+双流 Join 会生成拥有两条输入边的 `TwoInputStreamOperator` 的状态算子,它用到了两个状态,分别来保存来自左流和右流的更新。
+- 在同一个作业中为不同的状态计算设置不同 TTL。
+举例来说,假设一个 ETL 作业使用 `ROW_NUMBER` 进行[去重]({{< ref 
"docs/dev/table/sql/queries/deduplication" >}})操作后,
+紧接着使用 `GROUP BY` 语句进行[聚合]({{< ref "docs/dev/table/sql/queries/group-agg" 
>}})操作。
+该作业会分别生成两个拥有单条输入边的 `OneInputStreamOperator` 状态算子。您可以为去重算子和聚合算子的状态分别设置不同的 TTL。
+
+{{< hint info >}}
+由于基于窗口的操作(例如[窗口连接]({{< ref "docs/dev/table/sql/queries/window-join" 
>}})、[窗口聚合]({{< ref "docs/dev/table/sql/queries/window-agg" >}})、[窗口 Top-N]({{< 
ref "docs/dev/table/sql/queries/window-topn" >}}) 等)和 [Interval Join]({{< ref 
"docs/dev/table/sql/queries/joins" >}}#interval-joins) 不依赖于 
`table.exec.state.ttl` 来控制状态保留,因此它们的状态无法在算子级别进行配置。
+{{< /hint >}}
+
+**生成 Compiled Plan**
+
+配置过程首先会使用 `COMPILE PLAN` 语句生成一个 JSON 文件,它表示了序列化后的执行计划。
+{{< hint info >}}
+`COMPILE PLAN` 不支持查询语句 `SELECT... FROM...` 。 
+{{< /hint >}}
+
+- 执行 `COMPILE PLAN` 语句
+
+{{< tabs "compile-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+CompiledPlan compiledPlan = 
+    tableEnv.compilePlanSql(
+        "INSERT INTO enriched_orders \n" 
+       + "SELECT a.order_id, a.order_line_id, b.order_status, ... \n" 
+       + "FROM orders a JOIN line_orders b ON a.order_line_id = 
b.order_line_id");
+
+compiledPlan.writeToFile("/path/to/plan.json");
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+val compiledPlan = 
+    tableEnv.compilePlanSql(
+       """
+        |INSERT INTO enriched_orders
+        |SELECT a.order_id, a.order_line_id, b.order_status, ...
+        |FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id
+        |""".stripMargin)
+// CompilePlan#writeToFile only supports a local file path, if you need to 
write to remote filesystem,
+// please use tableEnv.executeSql("COMPILE PLAN 'hdfs://path/to/plan.json' FOR 
...")
+compiledPlan.writeToFile("/path/to/plan.json")
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
 
-通过移除状态的键,连续查询会完全忘记它曾经见过这个键。如果一个状态带有曾被移除状态的键被处理了,这条记录将被认为是
-对应键的第一条记录。上述例子中意味着 `sessionId` 会再次从 `0` 开始计数。
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> COMPILE PLAN 'file:///path/to/plan.json' FOR INSERT INTO 
enriched_orders
+> SELECT a.order_id, a.order_line_id, b.order_status, ...
+> FROM orders a JOIN line_orders b ON a.order_line_id = b.order_line_id;
+[INFO] Execute statement succeed.
+```
+
+{{< /tab >}}
+{{< /tabs >}}
+
+
+- SQL 语法
+
+    ```sql
+    COMPILE PLAN [IF NOT EXISTS] <plan_file_path> FOR 
<insert_statement>|<statement_set>;
+    
+    statement_set:
+        EXECUTE STATEMENT SET
+        BEGIN
+        insert_statement;
+        ...
+        insert_statement;
+        END;
+    
+    insert_statement:
+        <insert_from_select>|<insert_from_values>
+    ```
+    该语句会在指定位置 `/path/to/plan.json` 生成一个 JSON 文件。
+
+{{< hint info >}}
+`COMPILE PLAN` 语句支持写入 `hdfs://` 或 `s3://` 等 Flink 支持的[文件系统]({{< ref 
"docs/deployment/filesystems/overview" >}})。
+请确保已为目标写入路径设置了写入权限。
+{{< /hint >}}
+
+**修改 Compiled Plan**
+
+每个状态算子会显式地生成一个名为 "state" 的 JSON 数组,具有如下结构。
+理论上一个拥有 k 路输入的状态算子拥有 k 个状态。
+```json
+"state": [
+    {
+      "index": 0,
+      "ttl": "0 ms",
+      "name": "${1st input state name}"
+    },
+    {
+      "index": 1,
+      "ttl": "0 ms",
+      "name": "${2nd input state name}"
+    },
+    ...
+  ]
+```
+找到您需要修改的状态算子,将 TTL 设置为一个正整数(注意时间单位是毫秒),保存好文件,然后使用 `EXECUTE PLAN` 语句来提交作业。
+
+
+{{< hint info >}}
+理论上,下游状态算子的 TTL 不应小于上游状态算子的 TTL。
+{{< /hint >}}
+
+**执行 Compiled Plan**
+
+`EXECUTE PLAN` 语句将会反序列化上述 JSON 文件,进一步生成 JobGraph 并提交作业。
+通过 `EXECUTE PLAN` 语句提交的作业,其状态算子的 TTL 的值将会从文件中读取,配置项 `table.exec.state.ttl` 
的值将会被忽略。
+
+- 执行 `EXECUTE PLAN` 语句
+{{< tabs "execute-plan" >}}
+{{< tab "Java" >}}
+```java
+TableEnvironment tableEnv = 
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)");
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)");
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)");
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await();
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await();
+
+```
+
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode())
+tableEnv.executeSql(
+    "CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, buyer_id 
BIGINT, ...)")
+tableEnv.executeSql(
+    "CREATE TABLE line_orders (order_line_id BIGINT, order_status TINYINT, 
...)")
+tableEnv.executeSql(
+    "CREATE TABLE enriched_orders (order_id BIGINT, order_line_id BIGINT, 
order_status TINYINT, ...)")
+
+// PlanReference#fromFile only supports a local file path, if you need to read 
from remote filesystem,
+// please use tableEnv.executeSql("EXECUTE PLAN 
'hdfs://path/to/plan.json'").await()
+tableEnv.loadPlan(PlanReference.fromFile("/path/to/plan.json")).execute().await()
+```
+{{< /tab >}}
+{{< tab "SQL CLI" >}}
+
+```sql
+Flink SQL> CREATE TABLE orders (order_id BIGINT, order_line_id BIGINT, 
buyer_id BIGINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE line_orders (order_line_id BIGINT, order_status 
TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE enriched_orders (order_id BIGINT, order_line_id 
BIGINT, order_status TINYINT, ...);
+[INFO] Execute statement succeed.
+
+Flink SQL> EXECUTE PLAN 'file:///path/to/plan.json';
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 79fbe3fa497e4689165dd81b1d225ea8
+```
 
+{{< /tab >}}
+{{< /tabs >}}
+
+- SQL 语法
+
+    ```sql
+    EXECUTE PLAN [IF EXISTS] <plan_file_path>;
+    ```
+    该语句反序列化指定的 JSON 文件,并提交作业。
+
+**完整示例**
+
+下面的例子展示了一个通过双流 Join 计算订单明细的作业,并且如何为左右流设置不同的 TTL。
+
+- 生成 compiled plan
+    ```sql
+    -- left source table
+    CREATE TABLE Orders (
+        `order_id` INT,
+        `line_order_id` INT
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- right source table
+    CREATE TABLE LineOrders (
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector'='...'
+    );
+    
+    -- sink table
+    CREATE TABLE OrdersShipInfo (
+        `order_id` INT,
+        `line_order_id` INT,
+        `ship_mode` STRING
+    ) WITH (
+        'connector' = '...'
+    );
+    
+    COMPILE PLAN '/path/to/plan.json' FOR
+    INSERT INTO OrdersShipInfo
+    SELECT a.order_id, a.line_order_id, b.ship_mode 
+    FROM Orders a JOIN LineOrders b 
+        ON a.line_order_id = b.line_order_id;
+    ```
+    生成的 JSON 文件内容如下。
+
+    ```json
+    {
+      "flinkVersion" : "1.18",
+      "nodes" : [ {
+        "id" : 1,
+        "type" : "stream-exec-table-source-scan_1",
+        "scanTableSource" : {
+          "table" : {
+            "identifier" : "`default_catalog`.`default_database`.`Orders`",
+            "resolvedTable" : { ... }
+          }
+        },
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
+        "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[order_id, line_order_id])",
+        "inputProperties" : [ ]
+      }, {
+        "id" : 2,
+        "type" : "stream-exec-exchange_1",
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT>",
+        "description" : "Exchange(distribution=[hash[line_order_id]])"
+      }, {
+        "id" : 3,
+        "type" : "stream-exec-table-source-scan_1",
+        "scanTableSource" : {
+          "table" : {
+            "identifier" : "`default_catalog`.`default_database`.`LineOrders`",
+            "resolvedTable" : {...}
+          }
+        },
+        "outputType" : "ROW<`line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : "TableSourceScan(table=[[default_catalog, 
default_database, LineOrders]], fields=[line_order_id, ship_mode])",
+        "inputProperties" : [ ]
+      }, {
+        "id" : 4,
+        "type" : "stream-exec-exchange_1",
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : "Exchange(distribution=[hash[line_order_id]])"
+      }, {
+        "id" : 5,
+        "type" : "stream-exec-join_2",
+        "joinSpec" : { ... },
+        "state" : [ {
+          "index" : 0,
+          "ttl" : "0 ms",
+          "name" : "leftState"
+        }, {
+          "index" : 1,
+          "ttl" : "0 ms",
+          "name" : "rightState"
+        } ],
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, 
`line_order_id0` INT, `ship_mode` VARCHAR(2147483647)>",
+        "description" : "Join(joinType=[InnerJoin], where=[(line_order_id = 
line_order_id0)], select=[order_id, line_order_id, line_order_id0, ship_mode], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])"
+      }, {
+        "id" : 6,
+        "type" : "stream-exec-calc_1",
+        "projection" : [ ... ],
+        "condition" : null,
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : "Calc(select=[order_id, line_order_id, ship_mode])"
+      }, {
+        "id" : 7,
+        "type" : "stream-exec-sink_2",
+        "configuration" : { ... },
+        "dynamicTableSink" : {
+          "table" : {
+            "identifier" : 
"`default_catalog`.`default_database`.`OrdersShipInfo`",
+            "resolvedTable" : { ... }
+          }
+        },
+        "inputChangelogMode" : [ "INSERT" ],
+        "inputProperties" : [ ... ],
+        "outputType" : "ROW<`order_id` INT, `line_order_id` INT, `ship_mode` 
VARCHAR(2147483647)>",
+        "description" : 
"Sink(table=[default_catalog.default_database.OrdersShipInfo], 
fields=[order_id, line_order_id, ship_mode])"
+      } ],
+      "edges" : [ ... ]
+    }
+    ```
+
+- 修改和执行 compiled plan
+
+    如下 JSON 格式代表了 Join 算子的状态信息。

Review Comment:
   nit
   ```suggestion
       如下 JSON 格式代表了 Join 算子的状态信息:
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to