lsyldliu commented on code in PR #24975: URL: https://github.com/apache/flink/pull/24975#discussion_r1665279963
########## docs/content.zh/docs/dev/table/materialized-table/overview.md: ########## @@ -0,0 +1,66 @@ +--- +title: 概览 +weight: 1 +type: docs +aliases: +- /dev/table/materialized-table/ +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 介绍 + +物化表是 Flink SQL 引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供一致的开发体验。在创建物化表时,通过指定数据新鲜度和查询,Flink 引擎会自动推导出物化表的 Schema ,并创建相应的数据刷新管道,以达到指定的新鲜度。 + +{{< hint warning >}} +**注意**:该功能目前是一个 MVP(最小可行产品)功能,仅适用于连接到以 [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}) 模式部署的 Flink 集群 的 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) +{{< /hint >}} + +# 核心概念 + +物化表包含以下核心概念:数据新鲜度、刷新模式、查询定义和 `Schema` 。 + +## 数据新鲜度 + +数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。 + +数据新鲜度是物化表的一个关键属性,具有两个主要作用: +- **确定刷新模式**:目前有连续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅 [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) 配置项。 + - 连续模式:启动 Flink 流作业,持续刷新物化表数据。 + - 全量模式:工作流调度器定期触发 Flink 批处理作业,全量刷新物化表数据。 +- **确定刷新频率**: + - 连续模式下,数据新鲜度转换为 Flink 流作业的 `checkpoint` 间隔。 + - 全量模式下,数据新鲜度转换为工作流的调度周期,例如 `cron` 表达式。 + +## 刷新模式 + +刷新模式有连续模式和全量模式两种。默认情况下,根据数据新鲜度推断刷新模式。用户可以为特定业务场景显式指定刷新模式,该模式将优先于数据新鲜度推断。 Review Comment: 该模式将优先于数据新鲜度推断 -> 它的优先级高于根据数据新鲜度推导的刷新模式。 ########## docs/content.zh/docs/dev/table/materialized-table/quickstart.md: ########## @@ -0,0 +1,317 @@ +--- +title: 快速入门 +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quickstart.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +# 快速入门 + +本入门指南将帮助你快速了解并开始使用物化表。内容包括环境设置,以及创建、修改和删除连续模式和全量模式的物化表。 + +# 架构介绍 + +- **Client**: 可以是任何能够与 [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) 交互的客户端,如 [SQL 客户端]({{< ref "docs/dev/table/sqlClient" >}})、[Flink JDBC 驱动]({{< ref "docs/dev/table/jdbcDriver" >}}) 等。 +- **Flink SQL Gateway**: 支持创建、修改和删除物化表。并包含了一个内置的工作流调度器,用于定期刷新全量模式的物化表。 +- **Flink Cluster**: 用于运行物化表刷新作业的 Flink 集群。 +- **Catalog**: 负责管理物化表元数据的创建、查询、修改和删除。 +- **Catalog Store**: 提供 Catalog 属性持久化功能,以便在 SQL 操作时自动初始化 Catalog 并获取相关的元数据。 Review Comment: SQL 操作 -> 操作物化表 ########## docs/content.zh/docs/dev/table/materialized-table/overview.md: ########## @@ -0,0 +1,66 @@ +--- +title: 概览 +weight: 1 +type: docs +aliases: +- /dev/table/materialized-table/ +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 介绍 + +物化表是 Flink SQL 引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供一致的开发体验。在创建物化表时,通过指定数据新鲜度和查询,Flink 引擎会自动推导出物化表的 Schema ,并创建相应的数据刷新管道,以达到指定的新鲜度。 + +{{< hint warning >}} +**注意**:该功能目前是一个 MVP(最小可行产品)功能,仅适用于连接到以 [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}) 模式部署的 Flink 集群 的 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) +{{< /hint >}} + +# 核心概念 + +物化表包含以下核心概念:数据新鲜度、刷新模式、查询定义和 `Schema` 。 + +## 数据新鲜度 + +数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。 + +数据新鲜度是物化表的一个关键属性,具有两个主要作用: +- **确定刷新模式**:目前有连续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅 [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) 配置项。 + - 连续模式:启动 Flink 流作业,持续刷新物化表数据。 + - 全量模式:工作流调度器定期触发 Flink 批处理作业,全量刷新物化表数据。 +- **确定刷新频率**: + - 连续模式下,数据新鲜度转换为 Flink 流作业的 `checkpoint` 间隔。 + - 全量模式下,数据新鲜度转换为工作流的调度周期,例如 `cron` 表达式。 + +## 刷新模式 + +刷新模式有连续模式和全量模式两种。默认情况下,根据数据新鲜度推断刷新模式。用户可以为特定业务场景显式指定刷新模式,该模式将优先于数据新鲜度推断。 + +- **连续模式**:Flink 流作业会增量更新物化表数据,当下游数据为 checkpoint 完成才可见时,其数据刷新频率与作业的 checkpoint 间隔相匹配。 +- **全量模式**:调度器会定期触发对物化表数据的全量覆盖,其数据刷新周期与工作流的调度周期相匹配。 + - 默认的覆盖行为是表级别的。如果分区字段存在,并且通过 [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 指定了时间分区字段格式,则按照分区覆盖,每次只刷新最新的分区。 Review Comment: 则按照分区覆盖,每次只刷新最新的分区。 -> 则按照分区粒度覆盖,即每次只刷新最新的分区。 ########## docs/content.zh/docs/dev/table/materialized-table/statements.md: ########## @@ -0,0 +1,348 @@ +--- +title: 语法说明 +weight: 2 +type: docs +aliases: +- /dev/table/materialized-table/statements.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 物化表语法 + +Flink SQL 目前支持以下物化表操作: +- [CREATE MATERIALIZED TABLE](#create-materialized-table) +- [ALTER MATERIALIZED TABLE](#alter-materialized-table) +- [DROP MATERIALIZED TABLE](#drop-materialized-table) + +# CREATE MATERIALIZED TABLE + +``` +CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name + +[ ([ <table_constraint> ]) ] + +[COMMENT table_comment] + +[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + +[WITH (key1=val1, key2=val2, ...)] + +FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } + +[REFRESH_MODE = { CONTINUOUS | FULL }] + +AS <select_statement> + +<table_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED +``` + +## PRIMARY KEY + +`PRIMARY KEY` 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。 + +## PARTITIONED BY + +`PARTITIONED BY` 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 `filesystem connector` ,每个分区将创建一个目录。 + +**示例:** + +```sql +-- 创建一个物化表并将分区字段设置为 `ds`。 +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ds + FROM + ... +``` + +<span class="label label-danger">注意</span> +- 分区字段必须是物化表查询语句中的字段。 + +## WITH Options + +`WITH Options` 可以定义创建物化表所需的属性,包括[连接器参数]({{< ref "docs/connectors/table/" >}})和分区字段的[时间格式参数]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter)。 + +```sql +-- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd' +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + ... +``` + +如上例所示,我们为 `ds` 分区列指定了 `date-formatter` 选项。每次调度时,调度时间将转换为相应的 `ds` 分区值。例如,在 `2024-01-01 00:00:00` 的调度时间下,只有分区 `ds = '2024-01-01'` 会被刷新。 + +<span class="label label-danger">注意</span> +- `partition.fields.#.date-formatter` 选项仅适用于全量模式。 +- [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 中的字段必须是有效的字符串类型分区字段。 + +## FRESHNESS + +`FRESHNESS` 用于指定物化表的数据新鲜度。 + +**数据新鲜度与刷新模式关系** + +数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。 + +**FRESHNESS 参数详解** + +FRESHNESS 参数的取值范围为 INTERVAL `'<num>'` { SECOND | MINUTE | HOUR | DAY }。`'<num>'` 必须为正整数,并且在全量模式下,`'<num>'` 应该是相应时间间隔单位的公约数。 + +**示例:** +(假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟) + +```sql +-- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业 +FRESHNESS = INTERVAL '1' SECOND + +-- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业 +FRESHNESS = INTERVAL '1' MINUTE + +-- 对应的刷新管道是一个调度周期为 1 小时的调度工作流 +FRESHNESS = INTERVAL '1' HOUR + +-- 对应的刷新管道是一个调度周期为 1 天的调度工作流 +FRESHNESS = INTERVAL '1' DAY +``` + +**不合法的 `FRESHNESS` 示例:** + +```sql +-- 间隔为负数 +FRESHNESS = INTERVAL '-1' SECOND + +-- 间隔为0 +FRESHNESS = INTERVAL '0' SECOND + +-- 间隔为月或者年 +FRESHNESS = INTERVAL '1' MONTH +FRESHNESS = INTERVAL '1' YEAR + +-- 全量模式下,间隔不是对应时间间隔单位的公约数 +FRESHNESS = INTERVAL '60' SECOND +FRESHNESS = INTERVAL '5' HOUR +``` + +<span class="label label-danger">注意</span> +- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。 +- 在连续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。 +- 在全量模式下,数据新鲜度必须转换为 `cron` 表达式,因此目前仅支持在预定义时间跨度内的新鲜度间隔,这种设计确保了与 `cron` 能力的一致性。具体支持以下新鲜度间隔: Review Comment: 因此目前仅支持在预定义时间跨度内的新鲜度间隔,这种设计确保了与 `cron` 能力的一致性 -> 因此目前仅支持在预定义时间间隔单位内的新鲜度间隔,这种设计确保了与 `cron` 表达式语义的一致性 ########## docs/content/docs/dev/table/materialized-table/statements.md: ########## @@ -0,0 +1,342 @@ +--- +title: Statements +weight: 2 +type: docs +aliases: +- /dev/table/materialized-table/statements.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Materialized Table Statements + +Flink SQL supports the following Materialized Table statements for now: +- [CREATE MATERIALIZED TABLE](#create-materialized-table) +- [Alter MATERIALIZED TABLE](#alter-materialized-table) +- [DROP MATERIALIZED TABLE](#drop-materialized-table) + +# CREATE MATERIALIZED TABLE + +``` +CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name + +[ ([ <table_constraint> ]) ] + +[COMMENT table_comment] + +[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + +[WITH (key1=val1, key2=val2, ...)] + +FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } + +[REFRESH_MODE = { CONTINUOUS | FULL }] + +AS <select_statement> + +<table_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED +``` + +## PRIMARY KEY + +`PRIMARY KEY` defines an optional list of columns that uniquely identifies each row within the table. The column as the primary key must be non-null. + +## PARTITIONED BY + +`PARTITIONED BY` define an optional list of columns to partition the materialized table. A directory is created for each partition if this materialized table is used as a filesystem sink. + +**Example:** + +```sql +-- Create a materialized table and specify the partition field as `ds`. +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ds + FROM + ... +``` + +<span class="label label-danger">Note</span> +- The partition column must be included in the query statement of the materialized table. + +## WITH Options + +`WITH Options` are used to specify the materialized table properties, including [connector options]({{< ref "docs/connectors/table/" >}}) and [time format option]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) for partition fields. + +```sql +-- Create a materialized table, specify the partition field as 'ds', and the corresponding time format as 'yyyy-MM-dd' +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + ... +``` + +As shown in the above example, we specified the date-formatter option for the `ds` partition column. During each scheduling, the scheduling time will be converted to the ds partition value. For example, for a scheduling time of `2024-01-01 00:00:00`, only the partition `ds = '2024-01-01'` will be refreshed. + +<span class="label label-danger">Note</span> +- The `partition.fields.#.date-formatter` option only works in full mode. +- The field in the [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) must be a valid string type partition field. + +## FRESHNESS + +`FRESHNESS` define the data freshness of a materialized table. + +**FRESHNESS and Refresh Mode Relationship** + +FRESHNESS defines the maximum amount of time that the materialized table’s content should lag behind updates to the base tables. It does two things, firstly it determines the [refresh mode]({{< ref "docs/dev/table/materialized-table/overview" >}}#refresh-mode) of the materialized table through [configuration]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold), followed by determines the data refresh frequency to meet the actual data freshness requirements. + +**Explanation of FRESHNESS Parameter** + +The FRESHNESS parameter range is INTERVAL `'<num>'` { SECOND | MINUTE | HOUR | DAY }. `'<num>'` must be a positive integer, and in FULL mode, `'<num>'` should be a common divisor of the respective time interval unit. + +**Examples:** +(Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes) + +```sql +-- The corresponding refresh pipeline is a streaming job with a checkpoint interval of 1 second +FRESHNESS = INTERVAL '1' SECOND + +-- The corresponding refresh pipeline is a real-time job with a checkpoint interval of 1 minute +FRESHNESS = INTERVAL '1' MINUTE + +-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 hour +FRESHNESS = INTERVAL '1' HOUR + +-- The corresponding refresh pipeline is a scheduled workflow with a schedule cycle of 1 day +FRESHNESS = INTERVAL '1' DAY +``` + +**Invalid `FRESHNESS` Examples:** + +```sql +-- Interval is a negative number +FRESHNESS = INTERVAL '-1' SECOND + +-- Interval is 0 +FRESHNESS = INTERVAL '0' SECOND + +-- Interval is in months or years +FRESHNESS = INTERVAL '1' MONTH +FRESHNESS = INTERVAL '1' YEAR + +-- In FULL mode, the interval is not a common divisor of the respective time range +FRESHNESS = INTERVAL '60' SECOND +FRESHNESS = INTERVAL '5' HOUR +``` + +<span class="label label-danger">Note</span> +- The materialized table data will be refreshed as closely as possible within the defined freshness but cannot guarantee complete satisfaction. +- In CONTINUOUS mode, setting a data freshness interval that is too short can impact job performance as it aligns with the checkpoint interval. To optimize checkpoint performance, consider [enabling-changelog]({{< ref "docs/ops/state/state_backends" >}}#incremental-checkpoints). +- In FULL mode, data freshness must be translated into a cron expression, consequently, only freshness intervals within predefined time spans are presently accommodated, this design ensures alignment with cron's capabilities. Specifically, support for the following freshness: + - Second: 30, 15, 10, 5, 2, and 1 second intervals. + - Minute: 30, 15, 10, 5, 2, and 1 minute intervals. + - Hour: 8, 4, 2, and 1 hour intervals. + - Day: 1 day. + +## REFRESH_MODE + +`REFRESH_MODE` is used to explicitly specify the refresh mode of the materialized table. The specified mode takes precedence over the framework's automatic inference to meet specific scenarios' needs. + +**Examples:** +(Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes) + +```sql +-- The refresh mode of the created materialized table is CONTINUOUS, and the job's checkpoint interval is 1 hour. +CREATE MATERIALIZED TABLE my_materialized_table + REFRESH_MODE = CONTINUOUS + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ... + +-- The refresh mode of the created materialized table is FULL, and the job's schedule cycle is 10 minutes. +CREATE MATERIALIZED TABLE my_materialized_table + REFRESH_MODE = FULL + FRESHNESS = INTERVAL '10' MINUTE + AS SELECT + ... +``` + +## AS <select_statement> + +This clause used to fill the materialized table with data from the select query. The upstream table can be a materialized table, table, or view. The select statement supports all Flink SQL [Queries]({{< ref "docs/dev/table/sql/queries/overview" >}}). + +**Example:** + +```sql +CREATE MATERIALIZED TABLE my_materialized_table + FRESHNESS = INTERVAL '10' SECOND + AS SELECT * FROM kafka_catalog.db1.kafka_table; +``` + +## Examples + +**(Assuming `materialized-table.refresh-mode.freshness-threshold` is 30 minutes)** + +Create a materialized table with a data freshness of 10 seconds and the derived refresh mode is CONTINUOUS: + +```sql +CREATE MATERIALIZED TABLE my_materialized_table_continuous + PARTITIONED BY (ds) + WITH ( + 'format' = 'debezium-json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + FRESHNESS = INTERVAL '10' SECOND + AS + SELECT + k.ds, + k.user_id, + COUNT(*) AS event_count, + SUM(k.amount) AS total_amount, + MAX(u.age) AS max_age + FROM + kafka_catalog.db1.kafka_table k + JOIN + user_catalog.db1.user_table u + ON + k.user_id = u.user_id + WHERE + k.event_type = 'purchase' + GROUP BY + k.ds, k.user_id +``` + +Create a materialized table with a data freshness of 1 hour and the derived refresh mode is FULL: + +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + FRESHNESS = INTERVAL '1' HOUR + AS + SELECT + p.ds, + p.product_id, + p.product_name, + AVG(s.sale_price) AS avg_sale_price, + SUM(s.quantity) AS total_quantity + FROM + paimon_catalog.db1.product_table p + LEFT JOIN + paimon_catalog.db1.sales_table s + ON + p.product_id = s.product_id + WHERE + p.category = 'electronics' + GROUP BY + p.ds, p.product_id, p.product_name +``` + +## Limitations + +- Does not support explicitly specifying columns +- Does not support modified query statements +- Does not support using temporary tables, temporary views, or temporary functions in the select query + +# ALTER MATERIALIZED TABLE + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec] +``` + +`ALTER MATERIALIZED TABLE` is used to manage materialized tables. This command allows users to suspend and resume refresh pipeline of materialized tables and manually trigger data refreshes. + +## SUSPEND + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND +``` + +`SUSPEND` is used to pause the background refresh pipeline of the materialized table. + +**Example:** + +```sql +-- Specify SAVEPOINT path before pausing +SET 'execution.checkpointing.savepoint-dir' = 'hdfs://savepoint_path'; + +-- Suspend the specified materialized table +ALTER MATERIALIZED TABLE my_materialized_table SUSPEND; +``` + +<span class="label label-danger">Note</span> +- When suspending a table in CONTINUOUS mode, the job will be paused using STOP WITH SAVEPOINT by default. You need to set the SAVEPOINT save path using [parameters]({{< ref "docs/deployment/config" >}}#execution-checkpointing-savepoint-dir). + +## RESUME + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)] +``` + +`RESUME` is used to resume the refresh pipeline of a materialized table. Temporary parameters can be specified through `WITH options`, which only take effect on the current refreshed pipeline and are not persistent. Review Comment: Temporary parameters -> Materialized table dynamic options ########## docs/content.zh/docs/dev/table/materialized-table/quickstart.md: ########## @@ -0,0 +1,317 @@ +--- +title: 快速入门 +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quickstart.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + + +# 快速入门 + +本入门指南将帮助你快速了解并开始使用物化表。内容包括环境设置,以及创建、修改和删除连续模式和全量模式的物化表。 + +# 架构介绍 + +- **Client**: 可以是任何能够与 [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) 交互的客户端,如 [SQL 客户端]({{< ref "docs/dev/table/sqlClient" >}})、[Flink JDBC 驱动]({{< ref "docs/dev/table/jdbcDriver" >}}) 等。 +- **Flink SQL Gateway**: 支持创建、修改和删除物化表。并包含了一个内置的工作流调度器,用于定期刷新全量模式的物化表。 +- **Flink Cluster**: 用于运行物化表刷新作业的 Flink 集群。 +- **Catalog**: 负责管理物化表元数据的创建、查询、修改和删除。 +- **Catalog Store**: 提供 Catalog 属性持久化功能,以便在 SQL 操作时自动初始化 Catalog 并获取相关的元数据。 + +{{< img src="/fig/materialized-table-architecture.svg" alt="Illustration of Flink Materialized Table Architecture" width="85%" >}} + +# 环境搭建 + +## 目录准备 + +**请将下面的示例路径替换为你机器上的实际路径。** + +- 创建 Catalog Store 和 Catalog 所需的目录 + +``` +# 用于 File Catalog Store 保存 Catalog 属性 +mkdir -p {catalog_store_path} + +# 用于 test-filesystem Catalog 保存元数据和表数据 +mkdir -p {catalog_path} + +# 用于 test-filesystem Catalog 的默认数据库 +mkdir -p {catalog_path}/mydb +``` + +- 创建目录分别用于保存 Checkpoints 和 Savepoints: + +``` +mkdir -p {checkpoints_path} + +mkdir -p {savepoints_path} +``` + +## 资源准备 + +这里的方法和[本地安装]({{< ref "docs/try-flink/local_installation" >}})中记录的步骤类似。Flink 可以运行在任何类 UNIX 的操作系统下面,例如:Linux, Mac OS X 和 Cygwin (for Windows)。你需要在本地安装好 __Java__,可以通过下述命令行的方式检查安装好的 Java 版本: Review Comment: I think we should remove the sentence: 你需要在本地安装好 __Java__,可以通过下述命令行的方式检查安装好的 Java 版本: We don't need to emphasize that java version, the user knows this pre-condition. ########## docs/content.zh/docs/dev/table/materialized-table/statements.md: ########## @@ -0,0 +1,348 @@ +--- +title: 语法说明 +weight: 2 +type: docs +aliases: +- /dev/table/materialized-table/statements.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 物化表语法 + +Flink SQL 目前支持以下物化表操作: +- [CREATE MATERIALIZED TABLE](#create-materialized-table) +- [ALTER MATERIALIZED TABLE](#alter-materialized-table) +- [DROP MATERIALIZED TABLE](#drop-materialized-table) + +# CREATE MATERIALIZED TABLE + +``` +CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name + +[ ([ <table_constraint> ]) ] + +[COMMENT table_comment] + +[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + +[WITH (key1=val1, key2=val2, ...)] + +FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } + +[REFRESH_MODE = { CONTINUOUS | FULL }] + +AS <select_statement> + +<table_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED +``` + +## PRIMARY KEY + +`PRIMARY KEY` 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。 + +## PARTITIONED BY + +`PARTITIONED BY` 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 `filesystem connector` ,每个分区将创建一个目录。 + +**示例:** + +```sql +-- 创建一个物化表并将分区字段设置为 `ds`。 +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ds + FROM + ... +``` + +<span class="label label-danger">注意</span> +- 分区字段必须是物化表查询语句中的字段。 + +## WITH Options + +`WITH Options` 可以定义创建物化表所需的属性,包括[连接器参数]({{< ref "docs/connectors/table/" >}})和分区字段的[时间格式参数]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter)。 + +```sql +-- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd' +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + ... +``` + +如上例所示,我们为 `ds` 分区列指定了 `date-formatter` 选项。每次调度时,调度时间将转换为相应的 `ds` 分区值。例如,在 `2024-01-01 00:00:00` 的调度时间下,只有分区 `ds = '2024-01-01'` 会被刷新。 + +<span class="label label-danger">注意</span> +- `partition.fields.#.date-formatter` 选项仅适用于全量模式。 +- [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 中的字段必须是有效的字符串类型分区字段。 + +## FRESHNESS + +`FRESHNESS` 用于指定物化表的数据新鲜度。 + +**数据新鲜度与刷新模式关系** + +数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。 + +**FRESHNESS 参数详解** + +FRESHNESS 参数的取值范围为 INTERVAL `'<num>'` { SECOND | MINUTE | HOUR | DAY }。`'<num>'` 必须为正整数,并且在全量模式下,`'<num>'` 应该是相应时间间隔单位的公约数。 + +**示例:** +(假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟) + +```sql +-- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业 +FRESHNESS = INTERVAL '1' SECOND + +-- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业 +FRESHNESS = INTERVAL '1' MINUTE + +-- 对应的刷新管道是一个调度周期为 1 小时的调度工作流 +FRESHNESS = INTERVAL '1' HOUR + +-- 对应的刷新管道是一个调度周期为 1 天的调度工作流 +FRESHNESS = INTERVAL '1' DAY +``` + +**不合法的 `FRESHNESS` 示例:** + +```sql +-- 间隔为负数 +FRESHNESS = INTERVAL '-1' SECOND + +-- 间隔为0 +FRESHNESS = INTERVAL '0' SECOND + +-- 间隔为月或者年 +FRESHNESS = INTERVAL '1' MONTH +FRESHNESS = INTERVAL '1' YEAR + +-- 全量模式下,间隔不是对应时间间隔单位的公约数 +FRESHNESS = INTERVAL '60' SECOND +FRESHNESS = INTERVAL '5' HOUR +``` + +<span class="label label-danger">注意</span> +- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。 +- 在连续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。 +- 在全量模式下,数据新鲜度必须转换为 `cron` 表达式,因此目前仅支持在预定义时间跨度内的新鲜度间隔,这种设计确保了与 `cron` 能力的一致性。具体支持以下新鲜度间隔: + - 秒:30、15、10、5、2 和 1 秒间隔。 + - 分钟:30、15、10、5、2 和 1 分钟间隔。 + - 小时:8、4、2 和 1 小时间隔。 + - 天:1 天。 + +## REFRESH_MODE + +`REFRESH_MODE` 用于显式指定物化表的刷新模式。指定的模式优先于框架的自动推断,以满足特定场景的需求。 Review Comment: 指定的模式优先于框架的自动推断 -> 指定的刷新模式比框架自动推导的模式具有更高的优先级。 ########## docs/content/docs/dev/table/materialized-table/quickstart.md: ########## @@ -0,0 +1,317 @@ +--- +title: Quickstart +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quickstart.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Quickstart Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment and creating, altering, and dropping materialized tables in CONTINUOUS and FULL mode. + +# Architecture Introduction + +- **Client**: Could be any client that can interact with [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}), such as [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}), [Flink JDBC Driver]({{< ref "docs/dev/table/jdbcDriver" >}}) and so on. +- **Flink SQL Gateway**: Supports creating, altering, and dropping materialized tables. It also serves as an embedded workflow scheduler to periodically refresh full mode materialized tables. +- **Flink Cluster**: The pipeline for refreshing materialized tables will run on the Flink cluster. +- **Catalog**: Manages the creation, retrieval, modification, and deletion of the metadata of materialized tables. +- **Catalog Store**: Supports catalog property persistence to automatically initialize catalogs for retrieving metadata in SQL operations. + +{{< img src="/fig/materialized-table-architecture.svg" alt="Illustration of Flink Materialized Table Architecture" width="85%" >}} + +# Environment Setup + +## Directory Preparation + +**Replace the example paths below with real paths on your machine.** + +- Create directories for Catalog Store and test-filesystem Catalog: + +``` +# Directory for File Catalog Store to save catalog information +mkdir -p {catalog_store_path} + +# Directory for test-filesystem Catalog to save table metadata and table data +mkdir -p {catalog_path} + +# Directory for the default database of test-filesystem Catalog +mkdir -p {catalog_path}/mydb +``` + +- Create directories for Checkpoints and Savepoints: + +``` +mkdir -p {checkpoints_path} + +mkdir -p {savepoints_path} +``` + +## Resource Preparation + +The method here is similar to the steps recorded in [local installation]({{< ref "docs/try-flink/local_installation" >}}). Flink can run on any UNIX-like operating system, such as Linux, Mac OS X, and Cygwin (for Windows). You need to have __Java__ installed locally. You can check the installed Java version with the following command: Review Comment: ditto ########## docs/content.zh/docs/dev/table/materialized-table/statements.md: ########## @@ -0,0 +1,348 @@ +--- +title: 语法说明 +weight: 2 +type: docs +aliases: +- /dev/table/materialized-table/statements.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 物化表语法 + +Flink SQL 目前支持以下物化表操作: +- [CREATE MATERIALIZED TABLE](#create-materialized-table) +- [ALTER MATERIALIZED TABLE](#alter-materialized-table) +- [DROP MATERIALIZED TABLE](#drop-materialized-table) + +# CREATE MATERIALIZED TABLE + +``` +CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name + +[ ([ <table_constraint> ]) ] + +[COMMENT table_comment] + +[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + +[WITH (key1=val1, key2=val2, ...)] + +FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } + +[REFRESH_MODE = { CONTINUOUS | FULL }] + +AS <select_statement> + +<table_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED +``` + +## PRIMARY KEY + +`PRIMARY KEY` 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。 + +## PARTITIONED BY + +`PARTITIONED BY` 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 `filesystem connector` ,每个分区将创建一个目录。 + +**示例:** + +```sql +-- 创建一个物化表并将分区字段设置为 `ds`。 +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ds + FROM + ... +``` + +<span class="label label-danger">注意</span> +- 分区字段必须是物化表查询语句中的字段。 + +## WITH Options + +`WITH Options` 可以定义创建物化表所需的属性,包括[连接器参数]({{< ref "docs/connectors/table/" >}})和分区字段的[时间格式参数]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter)。 + +```sql +-- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd' +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + ... +``` + +如上例所示,我们为 `ds` 分区列指定了 `date-formatter` 选项。每次调度时,调度时间将转换为相应的 `ds` 分区值。例如,在 `2024-01-01 00:00:00` 的调度时间下,只有分区 `ds = '2024-01-01'` 会被刷新。 + +<span class="label label-danger">注意</span> +- `partition.fields.#.date-formatter` 选项仅适用于全量模式。 +- [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 中的字段必须是有效的字符串类型分区字段。 + +## FRESHNESS + +`FRESHNESS` 用于指定物化表的数据新鲜度。 + +**数据新鲜度与刷新模式关系** + +数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。 + +**FRESHNESS 参数详解** + +FRESHNESS 参数的取值范围为 INTERVAL `'<num>'` { SECOND | MINUTE | HOUR | DAY }。`'<num>'` 必须为正整数,并且在全量模式下,`'<num>'` 应该是相应时间间隔单位的公约数。 + +**示例:** +(假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟) + +```sql +-- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业 +FRESHNESS = INTERVAL '1' SECOND + +-- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业 +FRESHNESS = INTERVAL '1' MINUTE + +-- 对应的刷新管道是一个调度周期为 1 小时的调度工作流 +FRESHNESS = INTERVAL '1' HOUR + +-- 对应的刷新管道是一个调度周期为 1 天的调度工作流 +FRESHNESS = INTERVAL '1' DAY +``` + +**不合法的 `FRESHNESS` 示例:** + +```sql +-- 间隔为负数 +FRESHNESS = INTERVAL '-1' SECOND + +-- 间隔为0 +FRESHNESS = INTERVAL '0' SECOND + +-- 间隔为月或者年 +FRESHNESS = INTERVAL '1' MONTH +FRESHNESS = INTERVAL '1' YEAR + +-- 全量模式下,间隔不是对应时间间隔单位的公约数 +FRESHNESS = INTERVAL '60' SECOND +FRESHNESS = INTERVAL '5' HOUR +``` + +<span class="label label-danger">注意</span> +- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。 +- 在连续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。 +- 在全量模式下,数据新鲜度必须转换为 `cron` 表达式,因此目前仅支持在预定义时间跨度内的新鲜度间隔,这种设计确保了与 `cron` 能力的一致性。具体支持以下新鲜度间隔: + - 秒:30、15、10、5、2 和 1 秒间隔。 + - 分钟:30、15、10、5、2 和 1 分钟间隔。 + - 小时:8、4、2 和 1 小时间隔。 + - 天:1 天。 + +## REFRESH_MODE + +`REFRESH_MODE` 用于显式指定物化表的刷新模式。指定的模式优先于框架的自动推断,以满足特定场景的需求。 + +**示例:** +(假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟) + +```sql +-- 创建的物化表的刷新模式为连续模式,作业的 checkpoint 间隔为 1 小时。 +CREATE MATERIALIZED TABLE my_materialized_table + REFRESH_MODE = CONTINUOUS + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ... + +-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。 +CREATE MATERIALIZED TABLE my_materialized_table + REFRESH_MODE = FULL + FRESHNESS = INTERVAL '10' MINUTE + AS SELECT + ... +``` + +## AS <select_statement> + +该子句用于从 select 查询中填充物化表数据。上游表可以是物化表、表或视图。select 语句支持所有 Flink SQL [查询]({{< ref "docs/dev/table/sql/queries/overview" >}})。 Review Comment: 该子句用于从 select 查询中填充物化表数据 -> 该子句用于定义填充物化表数据的查询 ########## docs/content.zh/docs/dev/table/materialized-table/overview.md: ########## @@ -0,0 +1,66 @@ +--- +title: 概览 +weight: 1 +type: docs +aliases: +- /dev/table/materialized-table/ +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 介绍 + +物化表是 Flink SQL 引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供一致的开发体验。在创建物化表时,通过指定数据新鲜度和查询,Flink 引擎会自动推导出物化表的 Schema ,并创建相应的数据刷新管道,以达到指定的新鲜度。 + +{{< hint warning >}} +**注意**:该功能目前是一个 MVP(最小可行产品)功能,仅适用于连接到以 [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}) 模式部署的 Flink 集群 的 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) +{{< /hint >}} + +# 核心概念 + +物化表包含以下核心概念:数据新鲜度、刷新模式、查询定义和 `Schema` 。 + +## 数据新鲜度 + +数据新鲜度定义了物化表数据相对于基础表更新的最大滞后时间。它并非绝对保证,而是 Flink 尝试达到的目标。框架会尽力确保物化表中的数据在指定的新鲜度内刷新。 + +数据新鲜度是物化表的一个关键属性,具有两个主要作用: +- **确定刷新模式**:目前有连续模式和全量模式。关于如何确定刷新模式的详细信息,请参阅 [materialized-table.refresh-mode.freshness-threshold]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold) 配置项。 + - 连续模式:启动 Flink 流作业,持续刷新物化表数据。 + - 全量模式:工作流调度器定期触发 Flink 批处理作业,全量刷新物化表数据。 +- **确定刷新频率**: + - 连续模式下,数据新鲜度转换为 Flink 流作业的 `checkpoint` 间隔。 + - 全量模式下,数据新鲜度转换为工作流的调度周期,例如 `cron` 表达式。 + +## 刷新模式 + +刷新模式有连续模式和全量模式两种。默认情况下,根据数据新鲜度推断刷新模式。用户可以为特定业务场景显式指定刷新模式,该模式将优先于数据新鲜度推断。 + +- **连续模式**:Flink 流作业会增量更新物化表数据,当下游数据为 checkpoint 完成才可见时,其数据刷新频率与作业的 checkpoint 间隔相匹配。 Review Comment: 当下游数据为 checkpoint 完成才可见时,其数据刷新频率与作业的 checkpoint 间隔相匹配 -> 下游数据会立即可见,或者等 checkpoint 完成时才可见,由对应的 Connector 行为决定。 ########## docs/content/docs/dev/table/materialized-table/quickstart.md: ########## @@ -0,0 +1,317 @@ +--- +title: Quickstart +weight: 3 +type: docs +aliases: +- /dev/table/materialized-table/quickstart.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Quickstart Guide + +This guide will help you quickly understand and get started with materialized tables. It includes setting up the environment and creating, altering, and dropping materialized tables in CONTINUOUS and FULL mode. + +# Architecture Introduction + +- **Client**: Could be any client that can interact with [Flink SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}), such as [SQL Client]({{< ref "docs/dev/table/sqlClient" >}}), [Flink JDBC Driver]({{< ref "docs/dev/table/jdbcDriver" >}}) and so on. +- **Flink SQL Gateway**: Supports creating, altering, and dropping materialized tables. It also serves as an embedded workflow scheduler to periodically refresh full mode materialized tables. +- **Flink Cluster**: The pipeline for refreshing materialized tables will run on the Flink cluster. +- **Catalog**: Manages the creation, retrieval, modification, and deletion of the metadata of materialized tables. +- **Catalog Store**: Supports catalog property persistence to automatically initialize catalogs for retrieving metadata in SQL operations. Review Comment: SQL operations -> materialized table related operations ########## docs/content.zh/docs/dev/table/materialized-table/overview.md: ########## @@ -0,0 +1,66 @@ +--- +title: 概览 +weight: 1 +type: docs +aliases: +- /dev/table/materialized-table/ +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 介绍 + +物化表是 Flink SQL 引入的一种新的表类型,旨在简化批处理和流处理数据管道,提供一致的开发体验。在创建物化表时,通过指定数据新鲜度和查询,Flink 引擎会自动推导出物化表的 Schema ,并创建相应的数据刷新管道,以达到指定的新鲜度。 + +{{< hint warning >}} +**注意**:该功能目前是一个 MVP(最小可行产品)功能,仅适用于连接到以 [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}) 模式部署的 Flink 集群 的 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) Review Comment: 仅适用于连接到以 [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}) 模式部署的 Flink 集群 的 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}}) -> 仅在 [SQL Gateway]({{< ref "docs/dev/table/sql-gateway/overview" >}})中可用,并且只支持部署作业到 Flink [Standalone]({{< ref "docs/deployment/resource-providers/standalone/overview" >}})集群。 ########## docs/content.zh/docs/dev/table/materialized-table/statements.md: ########## @@ -0,0 +1,348 @@ +--- +title: 语法说明 +weight: 2 +type: docs +aliases: +- /dev/table/materialized-table/statements.html +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# 物化表语法 + +Flink SQL 目前支持以下物化表操作: +- [CREATE MATERIALIZED TABLE](#create-materialized-table) +- [ALTER MATERIALIZED TABLE](#alter-materialized-table) +- [DROP MATERIALIZED TABLE](#drop-materialized-table) + +# CREATE MATERIALIZED TABLE + +``` +CREATE MATERIALIZED TABLE [catalog_name.][db_name.]table_name + +[ ([ <table_constraint> ]) ] + +[COMMENT table_comment] + +[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)] + +[WITH (key1=val1, key2=val2, ...)] + +FRESHNESS = INTERVAL '<num>' { SECOND | MINUTE | HOUR | DAY } + +[REFRESH_MODE = { CONTINUOUS | FULL }] + +AS <select_statement> + +<table_constraint>: + [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED +``` + +## PRIMARY KEY + +`PRIMARY KEY` 定义了一组可选的列,用于唯一标识表中的每一行。主键列必须非空。 + +## PARTITIONED BY + +`PARTITIONED BY` 定义了一组可选的列,用于对物化表进行分区。如果物化表使用了 `filesystem connector` ,每个分区将创建一个目录。 + +**示例:** + +```sql +-- 创建一个物化表并将分区字段设置为 `ds`。 +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ds + FROM + ... +``` + +<span class="label label-danger">注意</span> +- 分区字段必须是物化表查询语句中的字段。 + +## WITH Options + +`WITH Options` 可以定义创建物化表所需的属性,包括[连接器参数]({{< ref "docs/connectors/table/" >}})和分区字段的[时间格式参数]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter)。 + +```sql +-- 创建一个物化表,指定分区字段为 'ds' 和对应的时间格式为 'yyyy-MM-dd' +CREATE MATERIALIZED TABLE my_materialized_table + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + ... +``` + +如上例所示,我们为 `ds` 分区列指定了 `date-formatter` 选项。每次调度时,调度时间将转换为相应的 `ds` 分区值。例如,在 `2024-01-01 00:00:00` 的调度时间下,只有分区 `ds = '2024-01-01'` 会被刷新。 + +<span class="label label-danger">注意</span> +- `partition.fields.#.date-formatter` 选项仅适用于全量模式。 +- [partition.fields.#.date-formatter]({{< ref "docs/dev/table/config" >}}#partition-fields-date-formatter) 中的字段必须是有效的字符串类型分区字段。 + +## FRESHNESS + +`FRESHNESS` 用于指定物化表的数据新鲜度。 + +**数据新鲜度与刷新模式关系** + +数据新鲜度定义了物化表内容滞后于基础表更新的最长时间。它有两个作用,首先通过[配置]({{< ref "docs/dev/table/config" >}}#materialized-table-refresh-mode-freshness-threshold)确定物化表的[刷新模式]({{< ref "docs/dev/table/materialized-table/overview" >}}#刷新模式),然后确定数据刷新频率以满足实际数据新鲜度要求。 + +**FRESHNESS 参数详解** + +FRESHNESS 参数的取值范围为 INTERVAL `'<num>'` { SECOND | MINUTE | HOUR | DAY }。`'<num>'` 必须为正整数,并且在全量模式下,`'<num>'` 应该是相应时间间隔单位的公约数。 + +**示例:** +(假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟) + +```sql +-- 对应的刷新管道是一个 checkpoint 间隔为 1 秒的流处理作业 +FRESHNESS = INTERVAL '1' SECOND + +-- 对应的刷新管道是一个 checkpoint 间隔为 1 分钟的流处理作业 +FRESHNESS = INTERVAL '1' MINUTE + +-- 对应的刷新管道是一个调度周期为 1 小时的调度工作流 +FRESHNESS = INTERVAL '1' HOUR + +-- 对应的刷新管道是一个调度周期为 1 天的调度工作流 +FRESHNESS = INTERVAL '1' DAY +``` + +**不合法的 `FRESHNESS` 示例:** + +```sql +-- 间隔为负数 +FRESHNESS = INTERVAL '-1' SECOND + +-- 间隔为0 +FRESHNESS = INTERVAL '0' SECOND + +-- 间隔为月或者年 +FRESHNESS = INTERVAL '1' MONTH +FRESHNESS = INTERVAL '1' YEAR + +-- 全量模式下,间隔不是对应时间间隔单位的公约数 +FRESHNESS = INTERVAL '60' SECOND +FRESHNESS = INTERVAL '5' HOUR +``` + +<span class="label label-danger">注意</span> +- 尽管物化表数据将尽可能在定义的新鲜度内刷新,但不能保证完全满足新鲜度要求。 +- 在连续模式下,数据新鲜度和 `checkpoint` 间隔一致,设置过短的数据新鲜度可能会对作业性能产生影响。此外,为了优化 `checkpoint` 性能,建议[开启 Changelog]({{< ref "docs/ops/state/state_backends" >}}#开启-changelog)。 +- 在全量模式下,数据新鲜度必须转换为 `cron` 表达式,因此目前仅支持在预定义时间跨度内的新鲜度间隔,这种设计确保了与 `cron` 能力的一致性。具体支持以下新鲜度间隔: + - 秒:30、15、10、5、2 和 1 秒间隔。 + - 分钟:30、15、10、5、2 和 1 分钟间隔。 + - 小时:8、4、2 和 1 小时间隔。 + - 天:1 天。 + +## REFRESH_MODE + +`REFRESH_MODE` 用于显式指定物化表的刷新模式。指定的模式优先于框架的自动推断,以满足特定场景的需求。 + +**示例:** +(假定 `materialized-table.refresh-mode.freshness-threshold` 为 30 分钟) + +```sql +-- 创建的物化表的刷新模式为连续模式,作业的 checkpoint 间隔为 1 小时。 +CREATE MATERIALIZED TABLE my_materialized_table + REFRESH_MODE = CONTINUOUS + FRESHNESS = INTERVAL '1' HOUR + AS SELECT + ... + +-- 创建的物化表的刷新模式为全量模式,作业的调度周期为 10 分钟。 +CREATE MATERIALIZED TABLE my_materialized_table + REFRESH_MODE = FULL + FRESHNESS = INTERVAL '10' MINUTE + AS SELECT + ... +``` + +## AS <select_statement> + +该子句用于从 select 查询中填充物化表数据。上游表可以是物化表、表或视图。select 语句支持所有 Flink SQL [查询]({{< ref "docs/dev/table/sql/queries/overview" >}})。 + +**示例:** + +```sql +CREATE MATERIALIZED TABLE my_materialized_table + FRESHNESS = INTERVAL '10' SECOND + AS SELECT * FROM kafka_catalog.db1.kafka_table; +``` + +## 示例 + +(假设 materialized-table.refresh-mode.freshness-threshold 为 30 分钟) + +创建一个数据新鲜度为 `10` 秒的物化表,推导出的刷新模式为连续模式: + +```sql +CREATE MATERIALIZED TABLE my_materialized_table_continuous + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + FRESHNESS = INTERVAL '10' SECOND + REFRESH_MODE = 'CONTINUOUS' + AS + SELECT + k.ds, + k.user_id, + COUNT(*) AS event_count, + SUM(k.amount) AS total_amount, + MAX(u.age) AS max_age + FROM + kafka_catalog.db1.kafka_table k + JOIN + user_catalog.db1.user_table u + ON + k.user_id = u.user_id + WHERE + k.event_type = 'purchase' + GROUP BY + k.ds, k.user_id +``` + +创建一个数据新鲜度为 `1` 小时的物化表,推导出的刷新模式为全量模式: + +```sql +CREATE MATERIALIZED TABLE my_materialized_table_full + PARTITIONED BY (ds) + WITH ( + 'format' = 'json', + 'partition.fields.ds.date-formatter' = 'yyyy-MM-dd' + ) + FRESHNESS = INTERVAL '10' MINUTE + REFRESH_MODE = 'FULL' + AS + SELECT + p.ds, + p.product_id, + p.product_name, + AVG(s.sale_price) AS avg_sale_price, + SUM(s.quantity) AS total_quantity + FROM + paimon_catalog.db1.product_table p + LEFT JOIN + paimon_catalog.db1.sales_table s + ON + p.product_id = s.product_id + WHERE + p.category = 'electronics' + GROUP BY + p.ds, p.product_id, p.product_name +``` + +## 限制 +- 不支持显式指定列 +- 不支持修改查询语句 +- 不支持在 select 查询中使用临时表、临时视图或临时函数 + +# ALTER MATERIALIZED TABLE + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND | RESUME [WITH (key1=val1, key2=val2, ...)] | REFRESH [PARTITION partition_spec] +``` + +`ALTER MATERIALIZED TABLE` 用于管理物化表。用户可以使用此命令暂停和恢复物化表的刷新管道,并手动触发数据刷新。 + + +## SUSPEND + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name SUSPEND +``` + +`SUSPEND` 用于暂停物化表的后台刷新管道。 + +**示例:** + +```sql +-- 暂停前指定 SAVEPOINT 路径 +SET 'execution.checkpointing.savepoint-dir' = 'hdfs://savepoint_path'; + +-- 暂停指定的物化表 +ALTER MATERIALIZED TABLE my_materialized_table SUSPEND; +``` + +<span class="label label-danger">注意</span> +- 暂停连续模式的表时,默认会使用 `STOP WITH SAVEPOINT` 暂停作业,你需要使用[参数]({{< ref "docs/deployment/config" >}}#execution-checkpointing-savepoint-dir)设置 `SAVEPOINT` 保存路径。 + +## RESUME + +``` +ALTER MATERIALIZED TABLE [catalog_name.][db_name.]table_name RESUME [WITH (key1=val1, key2=val2, ...)] +``` + +`RESUME` 用于恢复物化表的刷新管道。在恢复时,可以通过 `WITH` 指定临时参数,该参数仅对当前恢复的刷新管道生效,并不会持久化。 Review Comment: 在恢复时,可以通过 `WITH` 指定临时参数,该参数仅对当前恢复的刷新管道生效,并不会持久化。-> 在恢复时,可以通过 `WITH` 子句动态指定物化表的参数,该参数仅对当前恢复的刷新管道生效,并不会持久化到物化表中。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org