lsyldliu commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1904023180


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -1374,4 +1391,24 @@ public String toString() {
                     + '}';
         }
     }
+
+    /* A table change to modify the definition query. */

Review Comment:
   ```suggestion
       /** A table change to modify the definition query. */
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    private final List<TableChange> columnChanges;

Review Comment:
   Use MaterializedTableChange



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =
+                
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        SqlNode validatedQuery =
+                
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validatedQuery).project(), () -> 
originalQuery);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                getResolvedMaterializedTable(context, identifier);
+        List<Column> addedColumns =
+                validateAndExtractNewColumns(
+                        oldTable.getResolvedSchema(), 
queryOperation.getResolvedSchema());
+
+        // Build new materialized table and apply changes
+        CatalogMaterializedTable updatedTable =
+                buildUpdatedMaterializedTable(oldTable, addedColumns, 
definitionQuery);
+
+        return new AlterMaterializedTableAsQueryOperation(
+                identifier,
+                
addedColumns.stream().map(TableChange::add).collect(Collectors.toList()),
+                TableChange.modifyDefinitionQuery(definitionQuery),
+                updatedTable);
+    }
+
+    private ObjectIdentifier resolveIdentifier(
+            SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    private ResolvedCatalogMaterializedTable getResolvedMaterializedTable(
+            ConvertContext context, ObjectIdentifier identifier) {
+        ResolvedCatalogBaseTable<?> baseTable =
+                
context.getCatalogManager().getTableOrError(identifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != baseTable.getTableKind()) {
+            throw new ValidationException(
+                    "Only materialized table support modify definition 
query.");
+        }
+        return (ResolvedCatalogMaterializedTable) baseTable;
+    }
+
+    private CatalogMaterializedTable buildUpdatedMaterializedTable(
+            ResolvedCatalogMaterializedTable oldTable,
+            List<Column> addedColumns,
+            String definitionQuery) {
+
+        Schema.Builder newSchemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+        addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), 
col.getDataType()));
+
+        return CatalogMaterializedTable.newBuilder()
+                .schema(newSchemaBuilder.build())
+                .comment(oldTable.getComment())
+                .partitionKeys(oldTable.getPartitionKeys())
+                .options(oldTable.getOptions())
+                .definitionQuery(definitionQuery)
+                .freshness(oldTable.getDefinitionFreshness())
+                .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+                .refreshMode(oldTable.getRefreshMode())
+                .refreshStatus(oldTable.getRefreshStatus())
+                
.refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null))
+                
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler())
+                .build();
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+        int originalColumnSize = oldSchema.getColumns().size();
+        int newColumnSize = newSchema.getColumns().size();
+
+        if (originalColumnSize > newColumnSize) {
+            throw new ValidationException(
+                    String.format(
+                            "Query modification failed: Column deletion is not 
allowed. "
+                                    + "When modifying a query, you can only 
append new columns at the end. "
+                                    + "Original schema has %d columns, but new 
schema has %d columns.",
+                            originalColumnSize, newColumnSize));
+        }
+
+        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
+            Column oldColumn = oldSchema.getColumns().get(i);
+            Column newColumn = newSchema.getColumns().get(i);
+            if (!oldColumn.equals(newColumn)) {
+                throw new ValidationException(
+                        String.format(

Review Comment:
   ```suggestion
                                           throw new ValidationException(
                           String.format(
                                   "When modifying the query of a materialized 
table, currently only support appending columns at the end of the schema, 
dropping, renaming and reordering columns are not supported.\n"
                                           + "Column mismatch at position %d: 
Original column is [%s], but new column is [%s].",
                                   i, oldColumn, newColumn));
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =
+                
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        SqlNode validatedQuery =
+                
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);

Review Comment:
   please add the following comment
   ```
          // The LATERAL operator was eliminated during sql validation, thus 
the unparsed SQL
           // does not contain LATERAL which is problematic,
           // the issue was resolved in CALCITE-4077
           // (always treat the table function as implicitly LATERAL).
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,114 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // direct apply the alter operation
+            List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
+            changes.add(op.getDefinitionQueryChange());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, changes, 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            suspendContinuousRefreshJob(
+                    operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema & query definition & refresh handler
+        // we should reset the savepoint path after the alter operation
+        ContinuousRefreshHandler refreshHandler =
+                
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
+        ContinuousRefreshHandler resetHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+        byte[] serializedBytes = serializeContinuousHandler(resetHandler);
+
+        List<TableChange> tableChanges = new 
ArrayList<>(op.getColumnChanges());
+        tableChanges.add(op.getDefinitionQueryChange());
+        tableChanges.add(
+                
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), 
serializedBytes));
+        CatalogMaterializedTable updatedMaterializedTable =
+                op.getNewMaterializedTable()
+                        .copy(
+                                
op.getNewMaterializedTable().getRefreshStatus(),
+                                resetHandler.asSummaryString(),
+                                serializedBytes);
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // rollback the alter operation
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter operation.",
+                        tableIdentifier,
+                        e);
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                oldMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                throw new SqlExecutionException(
+                        String.format(
+                                "Failed to Alter Materialized Table As Query 
Operation for materialized table %s.",
+                                tableIdentifier),
+                        e);
+            }
+        }
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private AlterMaterializedTableChangeOperation 
generateRollbackAlterMaterializedTableOperation(
+            CatalogMaterializedTable oldMaterializedTable,
+            AlterMaterializedTableChangeOperation op) {
+        List<TableChange> tableChanges = op.getTableChanges();
+        List<TableChange> rollbackChanges = new ArrayList<>();
+
+        for (TableChange tableChange : tableChanges) {

Review Comment:
   Why not rollback the definition query?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,114 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // direct apply the alter operation
+            List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
+            changes.add(op.getDefinitionQueryChange());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, changes, 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            suspendContinuousRefreshJob(
+                    operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema & query definition & refresh handler
+        // we should reset the savepoint path after the alter operation
+        ContinuousRefreshHandler refreshHandler =
+                
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
+        ContinuousRefreshHandler resetHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+        byte[] serializedBytes = serializeContinuousHandler(resetHandler);
+
+        List<TableChange> tableChanges = new 
ArrayList<>(op.getColumnChanges());
+        tableChanges.add(op.getDefinitionQueryChange());
+        tableChanges.add(
+                
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), 
serializedBytes));
+        CatalogMaterializedTable updatedMaterializedTable =
+                op.getNewMaterializedTable()
+                        .copy(
+                                
op.getNewMaterializedTable().getRefreshStatus(),
+                                resetHandler.asSummaryString(),
+                                serializedBytes);
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // rollback the alter operation
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter operation.",
+                        tableIdentifier,
+                        e);
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                oldMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                throw new SqlExecutionException(
+                        String.format(
+                                "Failed to Alter Materialized Table As Query 
Operation for materialized table %s.",
+                                tableIdentifier),
+                        e);
+            }
+        }
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private AlterMaterializedTableChangeOperation 
generateRollbackAlterMaterializedTableOperation(
+            CatalogMaterializedTable oldMaterializedTable,
+            AlterMaterializedTableChangeOperation op) {
+        List<TableChange> tableChanges = op.getTableChanges();
+        List<TableChange> rollbackChanges = new ArrayList<>();
+
+        for (TableChange tableChange : tableChanges) {
+            if (tableChange instanceof TableChange.AddColumn) {
+                TableChange.AddColumn addColumn = (TableChange.AddColumn) 
tableChange;
+                
rollbackChanges.add(TableChange.dropColumn(addColumn.getColumn().getName()));
+            } else if (tableChange instanceof 
TableChange.ModifyRefreshHandler) {
+                rollbackChanges.add(

Review Comment:
   This is not correct, we should use the refresh handler of rollback refresh 
job



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1027,6 +1028,320 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws 
Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table suspend
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path 
temporaryPath)
+            throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops ("
+                        + " PRIMARY KEY (ds, user_id) not enforced)"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  coalesce(user_id, 0) as user_id,\n"
+                        + "  shop_id,\n"
+                        + "  coalesce(ds, '') as ds,\n"
+                        + "  SUM (payment_amount_cents) AS payed_buy_fee_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // verify background job is running
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        oldTable.getSerializedRefreshHandler(), 
getClass().getClassLoader());
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // setup savepoint dir
+        String savepointDir = "file://" + temporaryPath.toAbsolutePath();
+        String setupSavepointDDL =
+                "SET 'execution.checkpointing.savepoint-dir' = '" + 
savepointDir + "'";
+        OperationHandle setupSavepointHandle =
+                service.executeStatement(sessionHandle, setupSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
setupSavepointHandle);
+
+        String alterTableDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  coalesce(user_id, 0) as user_id,\n"
+                        + "  shop_id,\n"
+                        + "  coalesce(ds, '') as ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle alterTableHandle =
+                service.executeStatement(sessionHandle, alterTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, alterTableHandle);
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(Collections.singletonList(Column.physical("pv", 
DataTypes.INT())));
+        assertThat(newTable.getResolvedSchema().getPrimaryKey())
+                .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
+        assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
+                .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isNotEqualTo(newTable.getSerializedRefreshHandler());
+
+        // verify the new continuous job is start without savepoint
+        ContinuousRefreshHandler newContinuousRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        newTable.getSerializedRefreshHandler(),
+                        Thread.currentThread().getContextClassLoader());
+        Optional<String> restorePath =
+                getJobRestoreSavepointPath(
+                        restClusterClient, 
newContinuousRefreshHandler.getJobId());
+        assertThat(restorePath).isEmpty();
+
+        // drop the materialized table
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInContinuousModeWithSuspendStatus(
+            @TempDir Path temporaryPath) throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS payed_buy_fee_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // verify background job is running
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        oldTable.getSerializedRefreshHandler(), 
getClass().getClassLoader());
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // setup savepoint dir
+        String savepointDir = "file://" + temporaryPath.toAbsolutePath();
+        String setupSavepointDDL =
+                "SET 'execution.checkpointing.savepoint-dir' = '" + 
savepointDir + "'";
+        OperationHandle setupSavepointHandle =
+                service.executeStatement(sessionHandle, setupSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
setupSavepointHandle);
+
+        // suspend materialized table
+        String suspendTableDDL = "ALTER MATERIALIZED TABLE users_shops 
SUSPEND";
+        OperationHandle suspendTableHandle =
+                service.executeStatement(sessionHandle, suspendTableDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, suspendTableHandle);
+
+        String alterTableDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle alterTableHandle =
+                service.executeStatement(sessionHandle, alterTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, alterTableHandle);
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(Collections.singletonList(Column.physical("pv", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());

Review Comment:
   ditto



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/TableChange.java:
##########
@@ -394,7 +404,7 @@ static ModifyRefreshHandler modifyRefreshHandler(
      * </pre>
      */
     @PublicEvolving
-    class AddColumn implements TableChange {
+    class AddColumn implements CatalogTableChange, MaterializedTableChange {

Review Comment:
   Please give more java docs to explain it also support materialized table



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =

Review Comment:
   I think we can append a seperated commit to fix the bug in 
SqlCreateMaterializedTableConverter, and add a udtf test to verify this logic.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    private final List<TableChange> columnChanges;
+
+    private final TableChange definitionQueryChange;
+
+    private final CatalogMaterializedTable newMaterializedTable;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier,
+            List<TableChange> columnChanges,
+            TableChange definitionQueryChange,
+            CatalogMaterializedTable newMaterializedTable) {
+        super(tableIdentifier);
+        this.columnChanges = columnChanges;
+        this.definitionQueryChange = definitionQueryChange;
+        this.newMaterializedTable = newMaterializedTable;
+    }
+
+    public List<TableChange> getColumnChanges() {
+        return columnChanges;
+    }
+
+    public TableChange getDefinitionQueryChange() {
+        return definitionQueryChange;
+    }
+
+    public CatalogMaterializedTable getNewMaterializedTable() {
+        return newMaterializedTable;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        throw new UnsupportedOperationException(
+                "AlterMaterializedTableAsQueryOperation doesn't support 
ExecutableOperation yet.");
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", tableIdentifier);
+        params.put("columnChanges", columnChanges);
+        params.put("definitionQueryChange", definitionQueryChange);
+
+        return OperationUtils.formatWithChildren(
+                "ALTER MATERIALIZED TABLE",

Review Comment:
   It would be better print "ALTER MATERIALIZED TABLE %s AS %s" here.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    private final List<TableChange> columnChanges;
+
+    private final TableChange definitionQueryChange;

Review Comment:
   We can put it the above list, If you need it, you can filter it from the 
list.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    private final List<TableChange> columnChanges;
+
+    private final TableChange definitionQueryChange;
+
+    private final CatalogMaterializedTable newMaterializedTable;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier,
+            List<TableChange> columnChanges,
+            TableChange definitionQueryChange,
+            CatalogMaterializedTable newMaterializedTable) {
+        super(tableIdentifier);
+        this.columnChanges = columnChanges;
+        this.definitionQueryChange = definitionQueryChange;
+        this.newMaterializedTable = newMaterializedTable;
+    }
+
+    public List<TableChange> getColumnChanges() {
+        return columnChanges;
+    }
+
+    public TableChange getDefinitionQueryChange() {
+        return definitionQueryChange;
+    }
+
+    public CatalogMaterializedTable getNewMaterializedTable() {
+        return newMaterializedTable;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        throw new UnsupportedOperationException(
+                "AlterMaterializedTableAsQueryOperation doesn't support 
ExecutableOperation yet.");
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", tableIdentifier);
+        params.put("columnChanges", columnChanges);
+        params.put("definitionQueryChange", definitionQueryChange);
+
+        return OperationUtils.formatWithChildren(
+                "ALTER MATERIALIZED TABLE",
+                params,
+                Collections.emptyList(),
+                Operation::asSummaryString);

Review Comment:
   I think we only need to print the definition query here.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,114 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // direct apply the alter operation
+            List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
+            changes.add(op.getDefinitionQueryChange());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, changes, 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            suspendContinuousRefreshJob(
+                    operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema & query definition & refresh handler
+        // we should reset the savepoint path after the alter operation
+        ContinuousRefreshHandler refreshHandler =
+                
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
+        ContinuousRefreshHandler resetHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+        byte[] serializedBytes = serializeContinuousHandler(resetHandler);
+
+        List<TableChange> tableChanges = new 
ArrayList<>(op.getColumnChanges());
+        tableChanges.add(op.getDefinitionQueryChange());
+        tableChanges.add(
+                
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), 
serializedBytes));
+        CatalogMaterializedTable updatedMaterializedTable =
+                op.getNewMaterializedTable()
+                        .copy(
+                                
op.getNewMaterializedTable().getRefreshStatus(),

Review Comment:
   You should use the refresh status and refresh handler that you get when you 
resume the refresh job.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableAsQueryConverter.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.operations.converters;
+
+import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableAsQuery;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.operations.Operation;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableAsQueryOperation;
+import org.apache.flink.table.planner.operations.PlannerQueryOperation;
+
+import org.apache.calcite.sql.SqlNode;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** A converter for {@link SqlAlterMaterializedTableAsQuery}. */
+public class SqlAlterMaterializedTableAsQueryConverter
+        implements SqlNodeConverter<SqlAlterMaterializedTableAsQuery> {
+
+    @Override
+    public Operation convertSqlNode(
+            SqlAlterMaterializedTableAsQuery sqlAlterMaterializedTableAsQuery,
+            ConvertContext context) {
+        ObjectIdentifier identifier = 
resolveIdentifier(sqlAlterMaterializedTableAsQuery, context);
+
+        // Validate and extract schema from query
+        String originalQuery =
+                
context.toQuotedSqlString(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        SqlNode validatedQuery =
+                
context.getSqlValidator().validate(sqlAlterMaterializedTableAsQuery.getAsQuery());
+        String definitionQuery = context.expandSqlIdentifiers(originalQuery);
+        PlannerQueryOperation queryOperation =
+                new PlannerQueryOperation(
+                        context.toRelRoot(validatedQuery).project(), () -> 
originalQuery);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                getResolvedMaterializedTable(context, identifier);
+        List<Column> addedColumns =
+                validateAndExtractNewColumns(
+                        oldTable.getResolvedSchema(), 
queryOperation.getResolvedSchema());
+
+        // Build new materialized table and apply changes
+        CatalogMaterializedTable updatedTable =
+                buildUpdatedMaterializedTable(oldTable, addedColumns, 
definitionQuery);
+
+        return new AlterMaterializedTableAsQueryOperation(
+                identifier,
+                
addedColumns.stream().map(TableChange::add).collect(Collectors.toList()),
+                TableChange.modifyDefinitionQuery(definitionQuery),
+                updatedTable);
+    }
+
+    private ObjectIdentifier resolveIdentifier(
+            SqlAlterMaterializedTableAsQuery sqlAlterTableAsQuery, 
ConvertContext context) {
+        UnresolvedIdentifier unresolvedIdentifier =
+                UnresolvedIdentifier.of(sqlAlterTableAsQuery.fullTableName());
+        return 
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
+    }
+
+    private ResolvedCatalogMaterializedTable getResolvedMaterializedTable(
+            ConvertContext context, ObjectIdentifier identifier) {
+        ResolvedCatalogBaseTable<?> baseTable =
+                
context.getCatalogManager().getTableOrError(identifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != baseTable.getTableKind()) {
+            throw new ValidationException(
+                    "Only materialized table support modify definition 
query.");
+        }
+        return (ResolvedCatalogMaterializedTable) baseTable;
+    }
+
+    private CatalogMaterializedTable buildUpdatedMaterializedTable(
+            ResolvedCatalogMaterializedTable oldTable,
+            List<Column> addedColumns,
+            String definitionQuery) {
+
+        Schema.Builder newSchemaBuilder =
+                
Schema.newBuilder().fromResolvedSchema(oldTable.getResolvedSchema());
+        addedColumns.forEach(col -> newSchemaBuilder.column(col.getName(), 
col.getDataType()));
+
+        return CatalogMaterializedTable.newBuilder()
+                .schema(newSchemaBuilder.build())
+                .comment(oldTable.getComment())
+                .partitionKeys(oldTable.getPartitionKeys())
+                .options(oldTable.getOptions())
+                .definitionQuery(definitionQuery)
+                .freshness(oldTable.getDefinitionFreshness())
+                .logicalRefreshMode(oldTable.getLogicalRefreshMode())
+                .refreshMode(oldTable.getRefreshMode())
+                .refreshStatus(oldTable.getRefreshStatus())
+                
.refreshHandlerDescription(oldTable.getRefreshHandlerDescription().orElse(null))
+                
.serializedRefreshHandler(oldTable.getSerializedRefreshHandler())
+                .build();
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+        int originalColumnSize = oldSchema.getColumns().size();
+        int newColumnSize = newSchema.getColumns().size();
+
+        if (originalColumnSize > newColumnSize) {
+            throw new ValidationException(

Review Comment:
   ```suggestion
                           throw new ValidationException(
                       String.format(
                               "When modifying the query of a materialized 
table, currently only support appending columns at the end of the schema, 
dropping, renaming and reordering columns are not supported.\n" +
                                       "Original schema has %d columns, but new 
derived schema has %d columns.",
                               originalColumnSize, newColumnSize));
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,114 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // direct apply the alter operation
+            List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
+            changes.add(op.getDefinitionQueryChange());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, changes, 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            suspendContinuousRefreshJob(
+                    operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema & query definition & refresh handler
+        // we should reset the savepoint path after the alter operation
+        ContinuousRefreshHandler refreshHandler =
+                
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
+        ContinuousRefreshHandler resetHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+        byte[] serializedBytes = serializeContinuousHandler(resetHandler);
+
+        List<TableChange> tableChanges = new 
ArrayList<>(op.getColumnChanges());
+        tableChanges.add(op.getDefinitionQueryChange());
+        tableChanges.add(
+                
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), 
serializedBytes));
+        CatalogMaterializedTable updatedMaterializedTable =
+                op.getNewMaterializedTable()
+                        .copy(
+                                
op.getNewMaterializedTable().getRefreshStatus(),
+                                resetHandler.asSummaryString(),
+                                serializedBytes);
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // rollback the alter operation
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter operation.",
+                        tableIdentifier,
+                        e);
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                oldMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                throw new SqlExecutionException(
+                        String.format(
+                                "Failed to Alter Materialized Table As Query 
Operation for materialized table %s.",
+                                tableIdentifier),
+                        e);
+            }
+        }
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private AlterMaterializedTableChangeOperation 
generateRollbackAlterMaterializedTableOperation(
+            CatalogMaterializedTable oldMaterializedTable,
+            AlterMaterializedTableChangeOperation op) {
+        List<TableChange> tableChanges = op.getTableChanges();

Review Comment:
   Use MaterializedTableChange



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,114 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // direct apply the alter operation
+            List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
+            changes.add(op.getDefinitionQueryChange());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, changes, 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            suspendContinuousRefreshJob(
+                    operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema & query definition & refresh handler
+        // we should reset the savepoint path after the alter operation
+        ContinuousRefreshHandler refreshHandler =
+                
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
+        ContinuousRefreshHandler resetHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+        byte[] serializedBytes = serializeContinuousHandler(resetHandler);
+
+        List<TableChange> tableChanges = new 
ArrayList<>(op.getColumnChanges());
+        tableChanges.add(op.getDefinitionQueryChange());
+        tableChanges.add(
+                
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), 
serializedBytes));

Review Comment:
   We don't need to modify the RefreshHandler, it has been updated when suspend 
the refresh job.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,114 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // direct apply the alter operation
+            List<TableChange> changes = new ArrayList<>(op.getColumnChanges());
+            changes.add(op.getDefinitionQueryChange());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, changes, 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            suspendContinuousRefreshJob(
+                    operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema & query definition & refresh handler
+        // we should reset the savepoint path after the alter operation
+        ContinuousRefreshHandler refreshHandler =
+                
deserializeContinuousHandler(oldMaterializedTable.getSerializedRefreshHandler());
+        ContinuousRefreshHandler resetHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+        byte[] serializedBytes = serializeContinuousHandler(resetHandler);
+
+        List<TableChange> tableChanges = new 
ArrayList<>(op.getColumnChanges());
+        tableChanges.add(op.getDefinitionQueryChange());
+        tableChanges.add(
+                
TableChange.modifyRefreshHandler(resetHandler.asSummaryString(), 
serializedBytes));
+        CatalogMaterializedTable updatedMaterializedTable =
+                op.getNewMaterializedTable()
+                        .copy(
+                                
op.getNewMaterializedTable().getRefreshStatus(),
+                                resetHandler.asSummaryString(),
+                                serializedBytes);
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, tableChanges, 
updatedMaterializedTable);
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != oldMaterializedTable.getRefreshStatus()) {
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // rollback the alter operation
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter operation.",
+                        tableIdentifier,
+                        e);
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                oldMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                throw new SqlExecutionException(

Review Comment:
   We also need to resume the original refresh job from savepoint.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1027,6 +1028,320 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());

Review Comment:
   I think we should verify the definition query is equals.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1027,6 +1028,320 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws 
Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table suspend
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInContinuousMode(@TempDir Path 
temporaryPath)
+            throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops ("
+                        + " PRIMARY KEY (ds, user_id) not enforced)"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  coalesce(user_id, 0) as user_id,\n"
+                        + "  shop_id,\n"
+                        + "  coalesce(ds, '') as ds,\n"
+                        + "  SUM (payment_amount_cents) AS payed_buy_fee_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // verify background job is running
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        oldTable.getSerializedRefreshHandler(), 
getClass().getClassLoader());
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // setup savepoint dir
+        String savepointDir = "file://" + temporaryPath.toAbsolutePath();
+        String setupSavepointDDL =
+                "SET 'execution.checkpointing.savepoint-dir' = '" + 
savepointDir + "'";
+        OperationHandle setupSavepointHandle =
+                service.executeStatement(sessionHandle, setupSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
setupSavepointHandle);
+
+        String alterTableDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  coalesce(user_id, 0) as user_id,\n"
+                        + "  shop_id,\n"
+                        + "  coalesce(ds, '') as ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle alterTableHandle =
+                service.executeStatement(sessionHandle, alterTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, alterTableHandle);
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(Collections.singletonList(Column.physical("pv", 
DataTypes.INT())));
+        assertThat(newTable.getResolvedSchema().getPrimaryKey())
+                .isEqualTo(oldTable.getResolvedSchema().getPrimaryKey());
+        assertThat(newTable.getResolvedSchema().getWatermarkSpecs())
+                .isEqualTo(oldTable.getResolvedSchema().getWatermarkSpecs());
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1027,6 +1028,320 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());
+        // the refresh handler in full mode should be the same as the old one
+        assertThat(oldTable.getSerializedRefreshHandler())
+                .isEqualTo(newTable.getSerializedRefreshHandler());
+        
assertThat(oldTable.getDefinitionFreshness()).isEqualTo(newTable.getDefinitionFreshness());
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullModeWithSuspendStatus() throws 
Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Alter materialized table suspend
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, 1 as order_amount FROM my_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(getAddedColumns(newTable.getResolvedSchema(), 
oldTable.getResolvedSchema()))
+                .isEqualTo(
+                        Collections.singletonList(
+                                Column.physical("order_amount_sum", 
DataTypes.INT())));
+        
assertThat(oldTable.getDefinitionQuery()).isNotEqualTo(newTable.getDefinitionQuery());

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to