lsyldliu commented on code in PR #25880: URL: https://github.com/apache/flink/pull/25880#discussion_r1904023180
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java: ########## @@ -1374,4 +1391,24 @@ public String toString() { + '}'; } } + + /* A table change to modify the definition query. */ Review Comment: ```suggestion /** A table change to modify the definition query. */ ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + private final List<TableChange> columnChanges; Review Comment: Use MaterializedTableChange ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = + context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery()); + SqlNode validatedQuery = + context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery()); + String definitionQuery = context.expandSqlIdentifiers(originalQuery); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validatedQuery).project(), () -> originalQuery); + + ResolvedCatalogMaterializedTable oldTable = + getResolvedMaterializedTable(context, identifier); + List<Column> addedColumns = + validateAndExtractNewColumns( + oldTable.getResolvedSchema(), queryOperation.getResolvedSchema()); + + // Build new materialized table and apply changes + CatalogMaterializedTable updatedTable = + buildUpdatedMaterializedTable(oldTable, addedColumns, definitionQuery); + + return new AlterMaterializedTableAsQueryOperation( + identifier, + addedColumns.stream().map(TableChange::add).collect(Collectors.toList()), + TableChange.modifyDefinitionQuery(definitionQuery), + updatedTable); + } + + private ObjectIdentifier resolveIdentifier( + SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName()); + return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + } + + private ResolvedCatalogMaterializedTable getResolvedMaterializedTable( + ConvertContext context, ObjectIdentifier identifier) { + ResolvedCatalogBaseTable<?> baseTable = + context.getCatalogManager().getTableOrError(identifier).getResolvedTable(); + if (MATERIALIZED_TABLE != baseTable.getTableKind()) { + throw new ValidationException( + "Only materialized table support modify definition query."); + } + return (ResolvedCatalogMaterializedTable) baseTable; + } + + private CatalogMaterializedTable buildUpdatedMaterializedTable( + ResolvedCatalogMaterializedTable oldTable, + List<Column> addedColumns, + String definitionQuery) { + + Schema.Builder newSchemaBuilder = + Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema()); + addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), col.getDataType())); + + return CatalogMaterializedTable.newBuilder() + .schema(newSchemaBuilder.build()) + .comment(oldTable.getComment()) + .partitionKeys(oldTable.getPartitionKeys()) + .options(oldTable.getOptions()) + .definitionQuery(definitionQuery) + .freshness(oldTable.getDefinitionFreshness()) + .logicalRefreshMode(oldTable.getLogicalRefreshMode()) + .refreshMode(oldTable.getRefreshMode()) + .refreshStatus(oldTable.getRefreshStatus()) + .refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null)) + .serializedRefreshHandler(oldTable.getSerializedRefreshHandler()) + .build(); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + int originalColumnSize = oldSchema.getColumns().size(); + int newColumnSize = newSchema.getColumns().size(); + + if (originalColumnSize > newColumnSize) { + throw new ValidationException( + String.format( + "Query modification failed: Column deletion is not allowed. " + + "When modifying a query, you can only append new columns at the end. " + + "Original schema has %d columns, but new schema has %d columns.", + originalColumnSize, newColumnSize)); + } + + for (int i = 0; i < oldSchema.getColumns().size(); i++) { + Column oldColumn = oldSchema.getColumns().get(i); + Column newColumn = newSchema.getColumns().get(i); + if (!oldColumn.equals(newColumn)) { + throw new ValidationException( + String.format( Review Comment: ```suggestion throw new ValidationException( String.format( "When modifying the query of a materialized table, currently only support appending columns at the end of the schema, dropping, renaming and reordering columns are not supported.\n" + "Column mismatch at position %d: Original column is [%s], but new column is [%s].", i, oldColumn, newColumn)); ``` ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = + context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery()); + SqlNode validatedQuery = + context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery()); + String definitionQuery = context.expandSqlIdentifiers(originalQuery); Review Comment: please add the following comment ``` // The LATERAL operator was eliminated during sql validation, thus the unparsed SQL // does not contain LATERAL which is problematic, // the issue was resolved in CALCITE-4077 // (always treat the table function as implicitly LATERAL). ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,114 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // direct apply the alter operation + List<TableChange> changes = new ArrayList<>(op.getColumnChanges()); + changes.add(op.getDefinitionQueryChange()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, changes, op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema & query definition & refresh handler + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + byte[] serializedBytes = serializeContinuousHandler(resetHandler); + + List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges()); + tableChanges.add(op.getDefinitionQueryChange()); + tableChanges.add( + TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes)); + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + op.getNewMaterializedTable().getRefreshStatus(), + resetHandler.asSummaryString(), + serializedBytes); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, tableChanges, updatedMaterializedTable); + operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // rollback the alter operation + LOG.warn( + "Failed to resume the continuous refresh job for materialized table {}, rollback the alter operation.", + tableIdentifier, + e); + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + oldMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + throw new SqlExecutionException( + String.format( + "Failed to Alter Materialized Table As Query Operation for materialized table %s.", + tableIdentifier), + e); + } + } + + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation( + CatalogMaterializedTable oldMaterializedTable, + AlterMaterializedTableChangeOperation op) { + List<TableChange> tableChanges = op.getTableChanges(); + List<TableChange> rollbackChanges = new ArrayList<>(); + + for (TableChange tableChange : tableChanges) { Review Comment: Why not rollback the definition query? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,114 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // direct apply the alter operation + List<TableChange> changes = new ArrayList<>(op.getColumnChanges()); + changes.add(op.getDefinitionQueryChange()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, changes, op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema & query definition & refresh handler + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + byte[] serializedBytes = serializeContinuousHandler(resetHandler); + + List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges()); + tableChanges.add(op.getDefinitionQueryChange()); + tableChanges.add( + TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes)); + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + op.getNewMaterializedTable().getRefreshStatus(), + resetHandler.asSummaryString(), + serializedBytes); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, tableChanges, updatedMaterializedTable); + operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // rollback the alter operation + LOG.warn( + "Failed to resume the continuous refresh job for materialized table {}, rollback the alter operation.", + tableIdentifier, + e); + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + oldMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + throw new SqlExecutionException( + String.format( + "Failed to Alter Materialized Table As Query Operation for materialized table %s.", + tableIdentifier), + e); + } + } + + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation( + CatalogMaterializedTable oldMaterializedTable, + AlterMaterializedTableChangeOperation op) { + List<TableChange> tableChanges = op.getTableChanges(); + List<TableChange> rollbackChanges = new ArrayList<>(); + + for (TableChange tableChange : tableChanges) { + if (tableChange instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) tableChange; + rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName())); + } else if (tableChange instanceof TableChange.ModifyRefreshHandler) { + rollbackChanges.add( Review Comment: This is not correct, we should use the refresh handler of rollback refresh job ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1027,6 +1028,320 @@ void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table suspend + String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle alterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPath) + throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops (" + + " PRIMARY KEY (ds, user_id) not enforced)" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " coalesce(user_id, 0) as user_id,\n" + + " shop_id,\n" + + " coalesce(ds, '') as ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // verify background job is running + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + oldTable.getSerializedRefreshHandler(), getClass().getClassLoader()); + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // setup savepoint dir + String savepointDir = "file://" + temporaryPath.toAbsolutePath(); + String setupSavepointDDL = + "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'"; + OperationHandle setupSavepointHandle = + service.executeStatement(sessionHandle, setupSavepointDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, setupSavepointHandle); + + String alterTableDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " coalesce(user_id, 0) as user_id,\n" + + " shop_id,\n" + + " coalesce(ds, '') as ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle alterTableHandle = + service.executeStatement(sessionHandle, alterTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterTableHandle); + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT()))); + assertThat(newTable.getResolvedSchema().getPrimaryKey()) + .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey()); + assertThat(newTable.getResolvedSchema().getWatermarkSpecs()) + .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + assertThat(oldTable.getSerializedRefreshHandler()) + .isNotEqualTo(newTable.getSerializedRefreshHandler()); + + // verify the new continuous job is start without savepoint + ContinuousRefreshHandler newContinuousRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + newTable.getSerializedRefreshHandler(), + Thread.currentThread().getContextClassLoader()); + Optional<String> restorePath = + getJobRestoreSavepointPath( + restClusterClient, newContinuousRefreshHandler.getJobId()); + assertThat(restorePath).isEmpty(); + + // drop the materialized table + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus( + @TempDir Path temporaryPath) throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // verify background job is running + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + oldTable.getSerializedRefreshHandler(), getClass().getClassLoader()); + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // setup savepoint dir + String savepointDir = "file://" + temporaryPath.toAbsolutePath(); + String setupSavepointDDL = + "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'"; + OperationHandle setupSavepointHandle = + service.executeStatement(sessionHandle, setupSavepointDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, setupSavepointHandle); + + // suspend materialized table + String suspendTableDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle suspendTableHandle = + service.executeStatement(sessionHandle, suspendTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, suspendTableHandle); + + String alterTableDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle alterTableHandle = + service.executeStatement(sessionHandle, alterTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterTableHandle); + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); Review Comment: ditto ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java: ########## @@ -394,7 +404,7 @@ static ModifyRefreshHandler modifyRefreshHandler( * </pre> */ @PublicEvolving - class AddColumn implements TableChange { + class AddColumn implements CatalogTableChange, MaterializedTableChange { Review Comment: Please give more java docs to explain it also support materialized table ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = Review Comment: I think we can append a seperated commit to fix the bug in SqlCreateMaterializedTableConverter, and add a udtf test to verify this logic. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + private final List<TableChange> columnChanges; + + private final TableChange definitionQueryChange; + + private final CatalogMaterializedTable newMaterializedTable; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, + List<TableChange> columnChanges, + TableChange definitionQueryChange, + CatalogMaterializedTable newMaterializedTable) { + super(tableIdentifier); + this.columnChanges = columnChanges; + this.definitionQueryChange = definitionQueryChange; + this.newMaterializedTable = newMaterializedTable; + } + + public List<TableChange> getColumnChanges() { + return columnChanges; + } + + public TableChange getDefinitionQueryChange() { + return definitionQueryChange; + } + + public CatalogMaterializedTable getNewMaterializedTable() { + return newMaterializedTable; + } + + @Override + public TableResultInternal execute(Context ctx) { + throw new UnsupportedOperationException( + "AlterMaterializedTableAsQueryOperation doesn't support ExecutableOperation yet."); + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", tableIdentifier); + params.put("columnChanges", columnChanges); + params.put("definitionQueryChange", definitionQueryChange); + + return OperationUtils.formatWithChildren( + "ALTER MATERIALIZED TABLE", Review Comment: It would be better print "ALTER MATERIALIZED TABLE %s AS %s" here. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + private final List<TableChange> columnChanges; + + private final TableChange definitionQueryChange; Review Comment: We can put it the above list, If you need it, you can filter it from the list. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + private final List<TableChange> columnChanges; + + private final TableChange definitionQueryChange; + + private final CatalogMaterializedTable newMaterializedTable; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, + List<TableChange> columnChanges, + TableChange definitionQueryChange, + CatalogMaterializedTable newMaterializedTable) { + super(tableIdentifier); + this.columnChanges = columnChanges; + this.definitionQueryChange = definitionQueryChange; + this.newMaterializedTable = newMaterializedTable; + } + + public List<TableChange> getColumnChanges() { + return columnChanges; + } + + public TableChange getDefinitionQueryChange() { + return definitionQueryChange; + } + + public CatalogMaterializedTable getNewMaterializedTable() { + return newMaterializedTable; + } + + @Override + public TableResultInternal execute(Context ctx) { + throw new UnsupportedOperationException( + "AlterMaterializedTableAsQueryOperation doesn't support ExecutableOperation yet."); + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", tableIdentifier); + params.put("columnChanges", columnChanges); + params.put("definitionQueryChange", definitionQueryChange); + + return OperationUtils.formatWithChildren( + "ALTER MATERIALIZED TABLE", + params, + Collections.emptyList(), + Operation::asSummaryString); Review Comment: I think we only need to print the definition query here. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,114 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // direct apply the alter operation + List<TableChange> changes = new ArrayList<>(op.getColumnChanges()); + changes.add(op.getDefinitionQueryChange()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, changes, op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema & query definition & refresh handler + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + byte[] serializedBytes = serializeContinuousHandler(resetHandler); + + List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges()); + tableChanges.add(op.getDefinitionQueryChange()); + tableChanges.add( + TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes)); + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + op.getNewMaterializedTable().getRefreshStatus(), Review Comment: You should use the refresh status and refresh handler that you get when you resume the refresh job. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; + +import org.apache.calcite.sql.SqlNode; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */ +public class SqlAlterMaterializedTableAsQueryConverter + implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> { + + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery, + ConvertContext context) { + ObjectIdentifier identifier = resolveIdentifier(sqlAlterMaterializedTableAsQuery, context); + + // Validate and extract schema from query + String originalQuery = + context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery()); + SqlNode validatedQuery = + context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery()); + String definitionQuery = context.expandSqlIdentifiers(originalQuery); + PlannerQueryOperation queryOperation = + new PlannerQueryOperation( + context.toRelRoot(validatedQuery).project(), () -> originalQuery); + + ResolvedCatalogMaterializedTable oldTable = + getResolvedMaterializedTable(context, identifier); + List<Column> addedColumns = + validateAndExtractNewColumns( + oldTable.getResolvedSchema(), queryOperation.getResolvedSchema()); + + // Build new materialized table and apply changes + CatalogMaterializedTable updatedTable = + buildUpdatedMaterializedTable(oldTable, addedColumns, definitionQuery); + + return new AlterMaterializedTableAsQueryOperation( + identifier, + addedColumns.stream().map(TableChange::add).collect(Collectors.toList()), + TableChange.modifyDefinitionQuery(definitionQuery), + updatedTable); + } + + private ObjectIdentifier resolveIdentifier( + SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName()); + return context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + } + + private ResolvedCatalogMaterializedTable getResolvedMaterializedTable( + ConvertContext context, ObjectIdentifier identifier) { + ResolvedCatalogBaseTable<?> baseTable = + context.getCatalogManager().getTableOrError(identifier).getResolvedTable(); + if (MATERIALIZED_TABLE != baseTable.getTableKind()) { + throw new ValidationException( + "Only materialized table support modify definition query."); + } + return (ResolvedCatalogMaterializedTable) baseTable; + } + + private CatalogMaterializedTable buildUpdatedMaterializedTable( + ResolvedCatalogMaterializedTable oldTable, + List<Column> addedColumns, + String definitionQuery) { + + Schema.Builder newSchemaBuilder = + Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema()); + addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), col.getDataType())); + + return CatalogMaterializedTable.newBuilder() + .schema(newSchemaBuilder.build()) + .comment(oldTable.getComment()) + .partitionKeys(oldTable.getPartitionKeys()) + .options(oldTable.getOptions()) + .definitionQuery(definitionQuery) + .freshness(oldTable.getDefinitionFreshness()) + .logicalRefreshMode(oldTable.getLogicalRefreshMode()) + .refreshMode(oldTable.getRefreshMode()) + .refreshStatus(oldTable.getRefreshStatus()) + .refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null)) + .serializedRefreshHandler(oldTable.getSerializedRefreshHandler()) + .build(); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + int originalColumnSize = oldSchema.getColumns().size(); + int newColumnSize = newSchema.getColumns().size(); + + if (originalColumnSize > newColumnSize) { + throw new ValidationException( Review Comment: ```suggestion throw new ValidationException( String.format( "When modifying the query of a materialized table, currently only support appending columns at the end of the schema, dropping, renaming and reordering columns are not supported.\n" + "Original schema has %d columns, but new derived schema has %d columns.", originalColumnSize, newColumnSize)); ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,114 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // direct apply the alter operation + List<TableChange> changes = new ArrayList<>(op.getColumnChanges()); + changes.add(op.getDefinitionQueryChange()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, changes, op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema & query definition & refresh handler + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + byte[] serializedBytes = serializeContinuousHandler(resetHandler); + + List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges()); + tableChanges.add(op.getDefinitionQueryChange()); + tableChanges.add( + TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes)); + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + op.getNewMaterializedTable().getRefreshStatus(), + resetHandler.asSummaryString(), + serializedBytes); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, tableChanges, updatedMaterializedTable); + operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // rollback the alter operation + LOG.warn( + "Failed to resume the continuous refresh job for materialized table {}, rollback the alter operation.", + tableIdentifier, + e); + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + oldMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + throw new SqlExecutionException( + String.format( + "Failed to Alter Materialized Table As Query Operation for materialized table %s.", + tableIdentifier), + e); + } + } + + return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); + } + + private AlterMaterializedTableChangeOperation generateRollbackAlterMaterializedTableOperation( + CatalogMaterializedTable oldMaterializedTable, + AlterMaterializedTableChangeOperation op) { + List<TableChange> tableChanges = op.getTableChanges(); Review Comment: Use MaterializedTableChange ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,114 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // direct apply the alter operation + List<TableChange> changes = new ArrayList<>(op.getColumnChanges()); + changes.add(op.getDefinitionQueryChange()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, changes, op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema & query definition & refresh handler + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + byte[] serializedBytes = serializeContinuousHandler(resetHandler); + + List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges()); + tableChanges.add(op.getDefinitionQueryChange()); + tableChanges.add( + TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes)); Review Comment: We don't need to modify the RefreshHandler, it has been updated when suspend the refresh job. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,114 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // direct apply the alter operation + List<TableChange> changes = new ArrayList<>(op.getColumnChanges()); + changes.add(op.getDefinitionQueryChange()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, changes, op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema & query definition & refresh handler + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + byte[] serializedBytes = serializeContinuousHandler(resetHandler); + + List<TableChange> tableChanges = new ArrayList<>(op.getColumnChanges()); + tableChanges.add(op.getDefinitionQueryChange()); + tableChanges.add( + TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), serializedBytes)); + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + op.getNewMaterializedTable().getRefreshStatus(), + resetHandler.asSummaryString(), + serializedBytes); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, tableChanges, updatedMaterializedTable); + operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != oldMaterializedTable.getRefreshStatus()) { + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // rollback the alter operation + LOG.warn( + "Failed to resume the continuous refresh job for materialized table {}, rollback the alter operation.", + tableIdentifier, + e); + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + oldMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + throw new SqlExecutionException( Review Comment: We also need to resume the original refresh job from savepoint. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1027,6 +1028,320 @@ void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); Review Comment: I think we should verify the definition query is equals. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1027,6 +1028,320 @@ void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table suspend + String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle alterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path temporaryPath) + throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops (" + + " PRIMARY KEY (ds, user_id) not enforced)" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " coalesce(user_id, 0) as user_id,\n" + + " shop_id,\n" + + " coalesce(ds, '') as ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // verify background job is running + ContinuousRefreshHandler activeRefreshHandler = + ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( + oldTable.getSerializedRefreshHandler(), getClass().getClassLoader()); + waitUntilAllTasksAreRunning( + restClusterClient, JobID.fromHexString(activeRefreshHandler.getJobId())); + + // setup savepoint dir + String savepointDir = "file://" + temporaryPath.toAbsolutePath(); + String setupSavepointDDL = + "SET 'execution.checkpointing.savepoint-dir' = '" + savepointDir + "'"; + OperationHandle setupSavepointHandle = + service.executeStatement(sessionHandle, setupSavepointDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, setupSavepointHandle); + + String alterTableDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " coalesce(user_id, 0) as user_id,\n" + + " shop_id,\n" + + " coalesce(ds, '') as ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + OperationHandle alterTableHandle = + service.executeStatement(sessionHandle, alterTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterTableHandle); + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo(Collections.singletonList(Column.physical("pv", DataTypes.INT()))); + assertThat(newTable.getResolvedSchema().getPrimaryKey()) + .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey()); + assertThat(newTable.getResolvedSchema().getWatermarkSpecs()) + .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs()); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); Review Comment: ditto ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1027,6 +1028,320 @@ void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); + // the refresh handler in full mode should be the same as the old one + assertThat(oldTable.getSerializedRefreshHandler()) + .isEqualTo(newTable.getSerializedRefreshHandler()); + assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness()); + } + + @Test + void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + // Alter materialized table suspend + String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle alterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, 1 as order_amount FROM my_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(getAddedColumns(newTable.getResolvedSchema(), oldTable.getResolvedSchema())) + .isEqualTo( + Collections.singletonList( + Column.physical("order_amount_sum", DataTypes.INT()))); + assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery()); Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org