twalthr commented on code in PR #28287:
URL: https://github.com/apache/flink/pull/28287#discussion_r3413555198
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java:
##########
@@ -383,6 +383,31 @@ default void alterTable(
alterTable(tablePath, newTable, ignoreIfNotExists);
}
+ /**
+ * Converts an existing {@link CatalogTable} to a {@link
CatalogMaterializedTable} in place,
+ * preserving the catalog entry's identity and storage.
+ *
+ * <p>The default throws {@link UnsupportedOperationException}; catalogs
that support in-place
+ * conversion override it. Launching the refresh job for the new
materialized table is the
+ * executor's responsibility, not the catalog's.
+ *
+ * @param tableChanges structured delta between {@code originalTable} and
{@code
+ * materializedTable}, useful for incremental storage migration.
+ * @throws TableNotExistException if the table does not exist
+ * @throws CatalogException in case of any runtime exception
+ */
+ default void convertTableToMaterializedTable(
+ ObjectPath tablePath,
+ CatalogTable originalTable,
+ CatalogMaterializedTable materializedTable,
+ List<TableChange> tableChanges)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException(
Review Comment:
```suggestion
throw new CatalogException(
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/ConvertTableToMaterializedTableOperation.java:
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.ExecutableOperation;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Operation to describe an in-place conversion of an existing regular table
to a materialized table
+ * via CREATE OR ALTER MATERIALIZED TABLE.
+ */
+@Internal
+public class ConvertTableToMaterializedTableOperation
+ implements MaterializedTableOperation, ExecutableOperation {
+
+ private final ObjectIdentifier tableIdentifier;
+ private final ResolvedCatalogTable originalTable;
+ private final ResolvedCatalogMaterializedTable materializedTable;
+ private final Function<ResolvedCatalogMaterializedTable, List<TableChange>>
+ tableChangesForTable;
+ private List<TableChange> tableChanges;
+
+ public ConvertTableToMaterializedTableOperation(
+ ObjectIdentifier tableIdentifier,
+ ResolvedCatalogTable originalTable,
+ ResolvedCatalogMaterializedTable materializedTable,
+ Function<ResolvedCatalogMaterializedTable, List<TableChange>>
tableChangesBuilder) {
+ this.tableIdentifier = tableIdentifier;
+ this.originalTable = originalTable;
+ this.materializedTable = materializedTable;
+ this.tableChangesForTable = tableChangesBuilder;
+ }
+
+ @Override
+ public TableResultInternal execute(Context ctx) {
+ ctx.getCatalogManager()
+ .convertTableToMaterializedTable(
+ originalTable, materializedTable, getTableChanges(),
tableIdentifier);
+ return TableResultImpl.TABLE_RESULT_OK;
+ }
+
+ public ObjectIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public ResolvedCatalogTable getOriginalTable() {
+ return originalTable;
+ }
+
+ public ResolvedCatalogMaterializedTable getMaterializedTable() {
+ return materializedTable;
+ }
+
+ public List<TableChange> getTableChanges() {
+ if (tableChanges == null) {
+ tableChanges = tableChangesForTable.apply(materializedTable);
+ }
+ return tableChanges;
+ }
+
+ @Override
+ public String asSummaryString() {
+ final Map<String, Object> params = new LinkedHashMap<>();
+ params.put("identifier", tableIdentifier);
+ params.put("materializedTable", materializedTable);
+ params.put(
+ "tableChanges",
+ getTableChanges().stream()
+ .map(AlterMaterializedTableChangeOperation::toString)
+ .collect(Collectors.joining(",\n")));
+ return OperationUtils.formatWithChildren(
+ "CONVERT TABLE TO MATERIALIZED TABLE",
Review Comment:
Usually the summary should represent the original SQL.
```suggestion
"CREATE OR ALTER MATERIALIZED TABLE",
```
##########
docs/content.zh/docs/sql/materialized-table/statements.md:
##########
@@ -260,6 +261,129 @@ The operation updates the materialized table similarly to
[ALTER MATERIALIZED TA
See [ALTER MATERIALIZED TABLE AS](#as-select_statement-1) for more details.
+## Converting a Table to a Materialized Table
+
+This lets you adopt a materialized table on top of a table that already
exists, without dropping and recreating it.
+
+`CREATE OR ALTER MATERIALIZED TABLE` can convert an existing regular table
into a materialized table in place. The catalog object keeps its identity and
underlying storage. Its kind becomes materialized table, and its schema,
options, query definition, freshness, and refresh mode are taken from the
conversion statement, exactly as for a newly created materialized table. After
the conversion, a refresh job is launched just as it is for a newly created
materialized table.
+
+**Enabling conversion**
+
+Conversion is disabled by default. It is a one-way operation: it permanently
turns a regular table into a materialized table and cannot be undone, because
there is no operation that converts a materialized table back into a regular
table. Keeping it off by default also preserves source compatibility. A `CREATE
OR ALTER MATERIALIZED TABLE` that happens to name an existing regular table
keeps its previous behavior of being rejected, so no existing workflow silently
changes meaning until you opt in.
+
+When conversion is disabled, `CREATE OR ALTER MATERIALIZED TABLE` against a
regular table is rejected. To enable it, set:
+
+```yaml
+table.materialized-table.conversion-from-table.enabled: true
+```
+
+The option is read at planning time from the session's root configuration, so
it must be set when the `TableEnvironment` session is initialized. Set it in
the cluster configuration file `config.yaml`, or in the configuration used to
create the session. Changing it afterwards with a session-level `SET` statement
has no effect.
+
+**Schema**
+
+The schema comes from the `CREATE OR ALTER MATERIALIZED TABLE` statement and
its query, exactly as for a brand-new materialized table. Nothing is taken from
the source table. These are the same rules `CREATE MATERIALIZED TABLE` already
uses.
+
+The examples below read from a source table `orders` and convert an existing
regular table `user_spending`:
+
+```sql
+-- Source table the query reads from
+CREATE TABLE orders (
+ user_id BIGINT NOT NULL,
+ amount BIGINT,
+ order_time TIMESTAMP(3)
+) WITH (
+ 'connector' = '...'
+);
+
+-- The existing regular table to convert. It has a primary key and a watermark,
+-- which the conversion does not carry over.
+CREATE TABLE user_spending (
+ user_id BIGINT NOT NULL,
+ total_amount BIGINT,
+ last_order TIMESTAMP(3),
+ PRIMARY KEY (user_id) NOT ENFORCED,
+ WATERMARK FOR last_order AS last_order - INTERVAL '5' SECOND
+) WITH (
+ 'connector' = '...'
+);
+```
+
+With no column list, the schema is exactly the query output:
+
+```sql
+CREATE OR ALTER MATERIALIZED TABLE user_spending
+ AS SELECT user_id, SUM(amount) AS total_amount FROM orders GROUP BY
user_id;
+-- columns: user_id, total_amount
+-- the source's last_order column, primary key, and watermark are not carried
over
+```
+
+To keep a primary key or watermark, re-declare it in the conversion statement.
A watermark needs a rowtime column, so the query must produce one:
+
+```sql
+CREATE OR ALTER MATERIALIZED TABLE user_spending (
+ PRIMARY KEY (user_id) NOT ENFORCED,
+ WATERMARK FOR last_order AS last_order - INTERVAL '5' SECOND
Review Comment:
this is a very bad example, declaring a watermark on an aggregation column
will never work in reality. maybe simplify the query to
```
SELECT user_id, order_time
FROM orders
```
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java:
##########
@@ -264,6 +264,29 @@ public void alterTable(
}
}
+ @Override
+ public void convertTableToMaterializedTable(
+ ObjectPath tablePath,
+ CatalogTable originalTable,
+ CatalogMaterializedTable materializedTable,
+ List<TableChange> tableChanges)
+ throws TableNotExistException {
+ checkNotNull(tablePath);
+ checkNotNull(materializedTable);
+
+ final CatalogBaseTable existing = tables.get(tablePath);
+ if (existing == null) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ if (existing.getTableKind() != CatalogBaseTable.TableKind.TABLE) {
+ throw new CatalogException(
+ String.format(
+ "Cannot convert %s to a materialized table:
existing entry has kind %s.",
+ tablePath.getFullName(), existing.getTableKind()));
+ }
Review Comment:
This should not be implemented by every catalog. The framework should take
care of this. A catalog should be able to assume a validated operation.
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java:
##########
@@ -1404,6 +1404,44 @@ public void alterTable(
"AlterTable");
}
+ /**
+ * Converts an existing regular table to a materialized table in place.
Identity and storage are
+ * preserved; only the kind and the materialized-table specific metadata
change.
+ *
+ * @param originalTable the existing regular table
+ * @param materializedTable the new materialized table definition
+ * @param changes describe the modification from originalTable to
materializedTable
+ * @param objectIdentifier fully qualified path of the table being
converted
+ */
+ public void convertTableToMaterializedTable(
+ CatalogTable originalTable,
+ CatalogMaterializedTable materializedTable,
+ List<TableChange> changes,
+ ObjectIdentifier objectIdentifier) {
+ execute(
+ (catalog, path) -> {
+ final CatalogTable resolvedOriginal =
+ (CatalogTable)
resolveCatalogBaseTable(originalTable);
+ final CatalogMaterializedTable resolvedMt =
+ (CatalogMaterializedTable)
resolveCatalogBaseTable(materializedTable);
+ catalog.convertTableToMaterializedTable(
+ path, resolvedOriginal, resolvedMt, changes);
+ catalogModificationListeners.forEach(
+ listener ->
+ listener.onEvent(
+ AlterTableEvent.createEvent(
Review Comment:
I also don't fully understand who uses these listeners. But marking this as
a AlterTableEvent sounds good to me.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlNodeToOperationConvertTableToMaterializedTableTest.java:
##########
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.expressions.SqlCallExpression;
+import org.apache.flink.table.operations.Operation;
+import
org.apache.flink.table.operations.materializedtable.ConvertTableToMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import
org.apache.flink.table.operations.materializedtable.FullAlterMaterializedTableOperation;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for in-place conversion of a regular table to a materialized table
via {@code CREATE OR
+ * ALTER MATERIALIZED TABLE}.
+ */
+class SqlNodeToOperationConvertTableToMaterializedTableTest
+ extends SqlNodeToOperationConversionTestBase {
+
+ private static final String SOURCE_REGULAR_TABLE_NAME = "src_table";
+
+ @BeforeEach
+ void before() throws TableAlreadyExistException, DatabaseNotExistException
{
+ super.before();
+ sourceTable(SOURCE_REGULAR_TABLE_NAME).create();
+ sourceTable("t1_with_ts").create();
+ }
+
+ @Nested
+ class OperationSelection {
+ private static final String EXISTING_MT_NAME = "existing_mt";
+
+ @Test
+ void missingTargetCreatesMaterializedTable() {
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE brand_new" + " AS
SELECT a, b FROM t1";
+
assertThat(parse(sql)).isInstanceOf(CreateMaterializedTableOperation.class);
+ }
+
+ @Test
+ void existingMaterializedTableAlters()
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ configureConversionEnabled(true);
+ createExistingMaterializedTable();
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE "
+ + EXISTING_MT_NAME
+ + " AS SELECT a, b FROM t1";
+
assertThat(parse(sql)).isInstanceOf(FullAlterMaterializedTableOperation.class);
+ }
+
+ @Test
+ void regularTableWithConversionDisabledIsRejected() {
+ configureConversionEnabled(false);
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE "
+ + SOURCE_REGULAR_TABLE_NAME
+ + " AS SELECT a, b FROM t1";
+ assertThatThrownBy(() -> parse(sql))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Regular table does not support create or alter
operation.");
+ }
+
+ @Test
+ void regularTableWithConversionEnabledIsConverted() {
+ configureConversionEnabled(true);
+ final String sql =
+ "CREATE OR ALTER MATERIALIZED TABLE "
+ + SOURCE_REGULAR_TABLE_NAME
+ + " AS SELECT a, b FROM t1";
+
assertThat(parse(sql)).isInstanceOf(ConvertTableToMaterializedTableOperation.class);
+ }
+
+ @Test
+ void viewIsRejected() throws TableAlreadyExistException,
DatabaseNotExistException {
+ // A view is rejected regardless of the conversion flag: only
tables convert.
+ configureConversionEnabled(true);
+ final CatalogView view =
+ CatalogView.of(
+ Schema.newBuilder()
+ .column("a", DataTypes.BIGINT())
+ .column("b", DataTypes.STRING())
+ .build(),
+ null,
+ "SELECT a, b FROM t1",
+ "SELECT a, b FROM t1",
+ Map.of());
+ catalog.createTable(
+ new ObjectPath(catalogManager.getCurrentDatabase(),
"src_view"), view, false);
+
+ assertThatThrownBy(
+ () ->
+ parse(
+ "CREATE OR ALTER MATERIALIZED
TABLE src_view"
+ + " AS SELECT a, b FROM
t1"))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "VIEW does not support the CREATE OR ALTER
MATERIALIZED TABLE operation.");
+ }
+
+ private void createExistingMaterializedTable()
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ final String sql =
+ "CREATE MATERIALIZED TABLE existing_mt (\n"
+ + " CONSTRAINT pk1 PRIMARY KEY(a) NOT ENFORCED\n"
+ + ")\n"
+ + "AS SELECT a, b FROM t1";
+ final Operation op = parse(sql);
+
assertThat(op).isInstanceOf(CreateMaterializedTableOperation.class);
+ final CatalogMaterializedTable mt =
+ ((CreateMaterializedTableOperation)
op).getCatalogMaterializedTable();
+ catalog.createTable(
+ new ObjectPath(catalogManager.getCurrentDatabase(),
EXISTING_MT_NAME),
+ mt,
+ true);
+ }
+ }
+
+ @Nested
+ class ConfigScope {
+
+ @Test
+ void sessionOnlyEnableHasNoEffect() {
+ tableConfig.set(
+
TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED, true);
+ // root configuration left default (false)
+
+ assertThatThrownBy(() -> parse(conversionSql()))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Regular table does not support create or alter
operation.");
+ }
+
+ @Test
+ void clusterRootEnableAllowsConversion() {
+ configureConversionEnabled(true);
+
+ assertThat(parse(conversionSql()))
+
.isInstanceOf(ConvertTableToMaterializedTableOperation.class);
+ }
+
+ @Test
+ void bothSessionAndClusterEnabledAllowsConversion() {
+ tableConfig.set(
+
TableConfigOptions.MATERIALIZED_TABLE_CONVERSION_FROM_TABLE_ENABLED, true);
+ configureConversionEnabled(true);
+
+ assertThat(parse(conversionSql()))
+
.isInstanceOf(ConvertTableToMaterializedTableOperation.class);
+ }
+
+ @Test
+ void neitherSessionNorClusterEnabledIsRejected() {
+ // nothing set
+ assertThatThrownBy(() -> parse(conversionSql()))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining(
+ "Regular table does not support create or alter
operation.");
+ }
+
+ private String conversionSql() {
+ return "CREATE OR ALTER MATERIALIZED TABLE "
+ + SOURCE_REGULAR_TABLE_NAME
+ + " AS SELECT a, b FROM t1";
+ }
+ }
+
+ @Nested
+ class WatermarkAndPrimaryKey {
+
+ @BeforeEach
+ void enableConversion() {
+ configureConversionEnabled(true);
+ }
+
+ @Test
+ void multipleWatermarksAreRejected()
Review Comment:
Multiple watermarks are checked by the SchemaResolver already. We don't want
to test the schema resolver but this PR. Please reduce the number of tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]