LadyForest commented on code in PR #22837: URL: https://github.com/apache/flink/pull/22837#discussion_r1241186761
########## docs/content/docs/dev/table/concepts/overview.md: ########## @@ -51,51 +51,470 @@ state is used within a pipeline. The planner decides whether state is necessary result. A pipeline is optimized to claim as little state as possible given the current set of optimizer rules. +#### Stateful Operators + {{< hint info >}} Conceptually, source tables are never kept entirely in state. An implementer deals with logical tables (i.e. [dynamic tables]({{< ref "docs/dev/table/concepts/dynamic_tables" >}})). Their state requirements depend on the used operations. {{< /hint >}} -Queries such as `SELECT ... FROM ... WHERE` which only consist of field projections or filters are usually -stateless pipelines. However, operations such as joins, aggregations, or deduplications require keeping -intermediate results in a fault-tolerant storage for which Flink's state abstractions are used. - -{{< hint info >}} -Please refer to the individual operator documentation for more details about how much state is required -and how to limit a potentially ever-growing state size. -{{< /hint >}} +Queries contain stateful operations such as [joins]({{< ref "docs/dev/table/sql/queries/joins" >}}), [aggregations]({{< ref "docs/dev/table/sql/queries/group-agg" >}}), +or [deduplication]({{< ref "docs/dev/table/sql/queries/deduplication" >}}) +require keeping intermediate results in a fault-tolerant storage for which Flink's state abstractions are used. For example, a regular SQL join of two tables requires the operator to keep both input tables in state entirely. For correct SQL semantics, the runtime needs to assume that a matching could occur at any point in time from both sides. Flink provides [optimized window and interval joins]({{< ref "docs/dev/table/sql/queries/joins" >}}) that aim to keep the state size small by exploiting the concept of [watermarks]({{< ref "docs/dev/table/concepts/time_attributes" >}}). -Another example is the following query that computes the number of clicks per session. +Another example is the following query that computes the word count. ```sql -SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId; +CREATE TABLE doc ( + word STRING +) WITH ( + 'connector' = '...' +); +CREATE TABLE word_cnt ( + word STRING PRIMARY KEY NOT ENFORCED, + cnt BIGINT +) WITH ( + 'connector' = '...' +); + +INSERT INTO word_cnt +SELECT word, COUNT(1) AS cnt +FROM doc +GROUP BY word; ``` -The `sessionId` attribute is used as a grouping key and the continuous query maintains a count -for each `sessionId` it observes. The `sessionId` attribute is evolving over time and `sessionId` -values are only active until the session ends, i.e., for a limited period of time. However, the -continuous query cannot know about this property of `sessionId` and expects that every `sessionId` -value can occur at any point of time. It maintains a count for each observed `sessionId` value. -Consequently, the total state size of the query is continuously growing as more and more `sessionId` -values are observed. +The `word` field is used as a grouping key, and the continuous query writes a count +for each `word` it observes to the sink. +The `word` value is evolving over time, and due to the continuous query never ends, the framework needs to maintain a count for each observed `word` value. +Consequently, the total state size of the query is continuously growing as more and more `word` values are observed. + +{{< img alt="Explicit-derived stateful op" src="/fig/table-streaming/explicit-derived-stateful-op.png" width="85%">}} + +Queries such as `SELECT ... FROM ... WHERE` which only consist of field projections or filters are usually +stateless pipelines. +However, under some situations, the stateful operation is implicitly derived through the trait of input (*e.g.*, input is a changelog, see +[Table to Stream Conversion]({{< ref "docs/dev/table/concepts/dynamic_tables" >}}#table-to-stream-conversion)), +or through user configuration (see [`table-exec-source-cdc-events-duplicate`]({{< ref "docs/dev/table/config" >}}#table-exec-source-cdc-events-duplicate)). + +The following figure illustrates a `SELECT ... FROM` statement that querying an [upsert kafka source]({{< ref "docs/connectors/table/upsert-kafka" >}}). +```sql +CREATE TABLE upsert_kakfa ( + id INT PRIMARY KEY NOT ENFORCED, + message STRING +) WITH ( + 'connector' = 'upsert-kafka', + ... +); + +SELECT * FROM upsert_kakfa; +``` +The table source only provides messages with *INSERT*, *UPDATE_AFTER* and *DELETE* type, while the downstream sink requires a complete changelog (including *UPDATE_BEFORE*). +As a result, although this query itself does not involve explicit stateful calculation, the planner still generates a stateful operator called "ChangelogNormalize" to help obtain the complete changelog. +{{< img alt="Implicit-derived stateful op" src="/fig/table-streaming/implicit-derived-stateful-op.png" width="85%">}} + +{{< hint info >}} +Please refer to the individual operator documentation for more details about how much state is required +and how to limit a potentially ever-growing state size. +{{< /hint >}} #### Idle State Retention Time The *Idle State Retention Time* parameter [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" >}}#table-exec-state-ttl) defines for how long the state of a key is retained without being updated before it is removed. -For the previous example query, the count of a`sessionId` would be removed as soon as it has not +For the previous example query, the count of a`word` would be removed as soon as it has not been updated for the configured period of time. By removing the state of a key, the continuous query completely forgets that it has seen this key before. If a record with a key, whose state has been removed before, is processed, the record will be treated as if it was the first record with the respective key. For the example above this means -that the count of a `sessionId` would start again at `0`. +that the count of a `word` would start again at `0`. + +#### Configure Operator-level State TTL +-------------------------- +{{< hint warning >}} +This is an advanced feature and should be used with caution. It is only suitable for the cases +in which there are multiple states used in the pipeline, +and you need to set different TTL (Time-to-Live) for each state. +If the pipeline does not involve stateful computations, you do not need to follow this procedure. +If the pipeline only uses one state, you only need to set [`table.exec.state.ttl`]({{< ref "docs/dev/table/config" >}}#table-exec-state-ttl) +at pipeline level. +{{< /hint >}} + +From Flink v1.18, Table API & SQL supports configuring fine-grained state TTL at operator-level to improve the state usage. +To be more specific, the number of used states can be defined as the configuration granularity and is associated with each input state of the operator. Review Comment: Due to the one-to-many relationship between the operator and its state, we define the configurable minimum granularity as the number of incoming edges for each state operator. For OneInputStreamOperator, it can configure one state's TTL; for TwoInputStreamOperator (such as Join), which has two inputs, it can configure TTL for the left state and the right state, respectively. More generally, for KInputStreamOperator, K state TTLs can be configured. Assuming that a job uses two stateful OneInputStreamOperators and one stateful TwoInputStreamOperator, the total number of states that can be configured is 1 * 2 + 2 = 4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org