LadyForest commented on code in PR #22837:
URL: https://github.com/apache/flink/pull/22837#discussion_r1241186761


##########
docs/content/docs/dev/table/concepts/overview.md:
##########
@@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides 
whether state is necessary
 result. A pipeline is optimized to claim as little state as possible given the 
current set of optimizer
 rules.
 
+#### Stateful Operators
+
 {{< hint info >}}
 Conceptually, source tables are never kept entirely in state. An implementer 
deals with logical tables
 (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). 
Their state requirements
 depend on the used operations.
 {{< /hint >}}
 
-Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
-stateless pipelines. However, operations such as joins, aggregations, or 
deduplications require keeping
-intermediate results in a fault-tolerant storage for which Flink's state 
abstractions are used.
-
-{{< hint info >}}
-Please refer to the individual operator documentation for more details about 
how much state is required
-and how to limit a potentially ever-growing state size.
-{{< /hint >}}
+Queries contain stateful operations such as [joins]({{< ref 
"docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref 
"docs/dev/table/sql/queries/group-agg" >}}), 
+or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}})
+require keeping intermediate results in a fault-tolerant storage for which 
Flink's state abstractions are used.
 
 For example, a regular SQL join of two tables requires the operator to keep 
both input tables in state
 entirely. For correct SQL semantics, the runtime needs to assume that a 
matching could occur at any
 point in time from both sides. Flink provides [optimized window and interval 
joins]({{< ref "docs/dev/table/sql/queries/joins" >}})
 that aim to keep the state size small by exploiting the concept of 
[watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}).
 
-Another example is the following query that computes the number of clicks per 
session.
+Another example is the following query that computes the word count.
 
 ```sql
-SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
+CREATE TABLE doc (
+    word STRING
+) WITH (
+    'connector' = '...'
+);
+CREATE TABLE word_cnt (
+    word STRING PRIMARY KEY NOT ENFORCED,
+    cnt  BIGINT
+) WITH (
+    'connector' = '...'
+);
+
+INSERT INTO word_cnt
+SELECT word, COUNT(1) AS cnt
+FROM doc
+GROUP BY word;
 ```
 
-The `sessionId` attribute is used as a grouping key and the continuous query 
maintains a count
-for each `sessionId` it observes. The `sessionId` attribute is evolving over 
time and `sessionId`
-values are only active until the session ends, i.e., for a limited period of 
time. However, the
-continuous query cannot know about this property of `sessionId` and expects 
that every `sessionId`
-value can occur at any point of time. It maintains a count for each observed 
`sessionId` value.
-Consequently, the total state size of the query is continuously growing as 
more and more `sessionId`
-values are observed.
+The `word` field is used as a grouping key, and the continuous query writes a 
count
+for each `word` it observes to the sink. 
+The `word` value is evolving over time, and due to the continuous query never 
ends, the framework needs to maintain a count for each observed `word` value.
+Consequently, the total state size of the query is continuously growing as 
more and more `word` values are observed.
+
+{{< img alt="Explicit-derived stateful op" 
src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}}
+
+Queries such as `SELECT ... FROM ... WHERE` which only consist of field 
projections or filters are usually
+stateless pipelines.
+However, under some situations, the stateful operation is implicitly derived 
through the trait of input (*e.g.*, input is a changelog, see
+[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" 
>}}#table-to-stream-conversion)), 
+or through user configuration (see 
[`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" 
>}}#table-exec-source-cdc-events-duplicate)).
+
+The following figure illustrates a `SELECT ... FROM` statement that querying 
an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+```sql
+CREATE TABLE upsert_kakfa (
+    id INT PRIMARY KEY NOT ENFORCED,
+    message  STRING
+) WITH (
+    'connector' = 'upsert-kafka',
+    ...
+);
+
+SELECT * FROM upsert_kakfa;
+```
+The table source only provides messages with *INSERT*, *UPDATE_AFTER* and 
*DELETE* type, while the downstream sink requires a complete changelog 
(including *UPDATE_BEFORE*). 
+As a result, although this query itself does not involve explicit stateful 
calculation, the planner still generates a stateful operator called 
"ChangelogNormalize" to help obtain the complete changelog.
+{{< img alt="Implicit-derived stateful op" 
src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}}
+
+{{< hint info >}}
+Please refer to the individual operator documentation for more details about 
how much state is required
+and how to limit a potentially ever-growing state size.
+{{< /hint >}}
 
 #### Idle State Retention Time
 
 The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref 
"docs/dev/table/config" >}}#table-exec-state-ttl)
 defines for how long the state of a key is retained without being updated 
before it is removed.
-For the previous example query, the count of a`sessionId` would be removed as 
soon as it has not
+For the previous example query, the count of a`word` would be removed as soon 
as it has not
 been updated for the configured period of time.
 
 By removing the state of a key, the continuous query completely forgets that 
it has seen this key
 before. If a record with a key, whose state has been removed before, is 
processed, the record will
 be treated as if it was the first record with the respective key. For the 
example above this means
-that the count of a `sessionId` would start again at `0`.
+that the count of a `word` would start again at `0`.
+
+#### Configure Operator-level State TTL
+--------------------------
+{{< hint warning >}}
+This is an advanced feature and should be used with caution. It is only 
suitable for the cases
+in which there are multiple states used in the pipeline,
+and you need to set different TTL (Time-to-Live) for each state. 
+If the pipeline does not involve stateful computations, you do not need to 
follow this procedure.
+If the pipeline only uses one state, you only need to set 
[`table.exec.state.ttl`]({{< ref "docs/dev/table/config" 
>}}#table-exec-state-ttl)
+at pipeline level.
+{{< /hint >}}
+
+From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL 
at operator-level to improve the state usage. 
+To be more specific, the number of used states can be defined as the 
configuration granularity and is associated with each input state of the 
operator. 

Review Comment:
   Due to the one-to-many relationship between the operator and its state, we 
define the configurable minimum granularity as the number of incoming edges for 
each state operator. For OneInputStreamOperator, it can configure one state's 
TTL; for TwoInputStreamOperator (such as Join), which has two inputs, it can 
configure TTL for the left state and the right state, respectively. More 
generally, for KInputStreamOperator, K state TTLs can be configured. Assuming 
that a job uses two stateful OneInputStreamOperators and one stateful 
TwoInputStreamOperator, the total number of states that can be configured is 1 
* 2 + 2 = 4.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to