lsyldliu commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1900789652


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java:
##########
@@ -97,6 +97,8 @@ CatalogMaterializedTable copy(
             String refreshHandlerDescription,
             byte[] serializedRefreshHandler);
 
+    CatalogMaterializedTable copy(Schema schema, String definitionQuery);
+

Review Comment:
   This is public interface, we can introduce this method randomly, Some 
connector may implement this interface such as paimon, this will result int 
incompatible behavior.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();
+        List<TableChange> tableChanges =
+                validateAndExtractNewColumns(oldResolvedSchema, 
resolvedQuerySchema).stream()

Review Comment:
   Regarding the TableChange overall design: 
   1. I think we should introduce a new `CatalogTableChange` interface extends 
TableChange, which represents the modification of CatalogTable, TableChange 
represents the modification of CatalogBaseTable, MaterializedTableChange 
represents the modification of CatalogMaterializedTable. And maybe in the 
future we will introduce CatalogViewChange.
   
   2. The common part of modify CatalogTable and CatalogMaterializedTable such 
as AddColumn, it should both implements the CatalogTableChange and 
MaterializedTableChange. Otherwise, the modification operation only need to 
implement one interface of these two.
   
   3. About alter materialized table query, we need to introduce a new 
MaterializedTableChange called `ModifyDefinitionQuery`



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();

Review Comment:
   Do we can use the SchemaResolver to resolve the schema here?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();
+        List<TableChange> tableChanges =
+                validateAndExtractNewColumns(oldResolvedSchema, 
resolvedQuerySchema).stream()
+                        .map(TableChange::add)
+                        .collect(Collectors.toList());
+        ResolvedSchema newResolvedSchema =
+                new ResolvedSchema(
+                        resolvedQuerySchema.getColumns(),
+                        oldResolvedSchema.getWatermarkSpecs(),
+                        oldResolvedSchema.getPrimaryKey().orElse(null));
+        Schema newSchema = 
Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build();
+
+        // update schema and definition query
+        String definitionQuery = queryOperation.asSerializableString();
+        CatalogMaterializedTable catalogMaterializedTable =
+                oldResolvedMaterializedTable.getOrigin().copy(newSchema, 
definitionQuery);
+
+        ResolvedCatalogMaterializedTable newResolvedMaterializedTable =
+                new ResolvedCatalogMaterializedTable(catalogMaterializedTable, 
newResolvedSchema);
+        ctx.getCatalogManager()
+                .alterTable(newResolvedMaterializedTable, tableChanges, 
tableIdentifier, false);

Review Comment:
   We should call the `AlterMaterializedTableChangeOperation` to complete the 
modify operation.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                suspendContinuousRefreshJob(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            } else {
+                suspendRefreshWorkflow(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            }
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema
+        operationExecutor.callExecutableOperation(handle, op);
+        ResolvedCatalogMaterializedTable updatedMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {

Review Comment:
   Why here is judge `CatalogMaterializedTable.RefreshStatus.SUSPENDED != 
materializedTable.getRefreshStatus()`? I think only the RefreshStatus is 
suspend, then we need to resume it?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java:
##########
@@ -391,6 +392,17 @@ void testAlterMaterializedTableResume() {
                 .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 
RESUME WITH (k1: [v1])");
     }
 
+    @Test
+    void testAlterMaterializedTableAsQuery() {

Review Comment:
   We should add more tests to cover the case I commented in code review.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                suspendContinuousRefreshJob(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            } else {
+                suspendRefreshWorkflow(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            }
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema
+        operationExecutor.callExecutableOperation(handle, op);

Review Comment:
   We shouldn't alter the materialized table metadata firstly, only the refresh 
job or workflow resume sucessfully then we can alter the materialized table 
definition query.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                suspendContinuousRefreshJob(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            } else {
+                suspendRefreshWorkflow(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            }
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema
+        operationExecutor.callExecutableOperation(handle, op);
+        ResolvedCatalogMaterializedTable updatedMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } else {
+                // resume workflow
+                resumeRefreshWorkflow(
+                        operationExecutor,
+                        handle,
+                        tableIdentifier,
+                        updatedMaterializedTable,
+                        Collections.emptyMap());
+            }
+        } else {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS

Review Comment:
   I think if we failed to start the continuous refresh job use new definition 
query, we should resume the original refresh job and the throw the exception. 
So this action is not required?



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();
+        List<TableChange> tableChanges =
+                validateAndExtractNewColumns(oldResolvedSchema, 
resolvedQuerySchema).stream()
+                        .map(TableChange::add)
+                        .collect(Collectors.toList());
+        ResolvedSchema newResolvedSchema =
+                new ResolvedSchema(
+                        resolvedQuerySchema.getColumns(),
+                        oldResolvedSchema.getWatermarkSpecs(),
+                        oldResolvedSchema.getPrimaryKey().orElse(null));
+        Schema newSchema = 
Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build();
+
+        // update schema and definition query
+        String definitionQuery = queryOperation.asSerializableString();
+        CatalogMaterializedTable catalogMaterializedTable =
+                oldResolvedMaterializedTable.getOrigin().copy(newSchema, 
definitionQuery);
+
+        ResolvedCatalogMaterializedTable newResolvedMaterializedTable =
+                new ResolvedCatalogMaterializedTable(catalogMaterializedTable, 
newResolvedSchema);
+        ctx.getCatalogManager()
+                .alterTable(newResolvedMaterializedTable, tableChanges, 
tableIdentifier, false);
+
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", tableIdentifier);
+        return OperationUtils.formatWithChildren(
+                "ALTER MATERIALIZED TABLE",
+                params,
+                Collections.singletonList(queryOperation),

Review Comment:
   It would be better we also print he definition query.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();
+        List<TableChange> tableChanges =
+                validateAndExtractNewColumns(oldResolvedSchema, 
resolvedQuerySchema).stream()
+                        .map(TableChange::add)
+                        .collect(Collectors.toList());
+        ResolvedSchema newResolvedSchema =
+                new ResolvedSchema(
+                        resolvedQuerySchema.getColumns(),
+                        oldResolvedSchema.getWatermarkSpecs(),
+                        oldResolvedSchema.getPrimaryKey().orElse(null));
+        Schema newSchema = 
Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build();
+
+        // update schema and definition query
+        String definitionQuery = queryOperation.asSerializableString();

Review Comment:
   As discuss offline, the definition query shouldn't come from QueryOperation. 
We should get it from SqlNode directly.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();
+        List<TableChange> tableChanges =
+                validateAndExtractNewColumns(oldResolvedSchema, 
resolvedQuerySchema).stream()
+                        .map(TableChange::add)
+                        .collect(Collectors.toList());
+        ResolvedSchema newResolvedSchema =
+                new ResolvedSchema(
+                        resolvedQuerySchema.getColumns(),
+                        oldResolvedSchema.getWatermarkSpecs(),
+                        oldResolvedSchema.getPrimaryKey().orElse(null));
+        Schema newSchema = 
Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build();
+
+        // update schema and definition query
+        String definitionQuery = queryOperation.asSerializableString();
+        CatalogMaterializedTable catalogMaterializedTable =
+                oldResolvedMaterializedTable.getOrigin().copy(newSchema, 
definitionQuery);
+
+        ResolvedCatalogMaterializedTable newResolvedMaterializedTable =
+                new ResolvedCatalogMaterializedTable(catalogMaterializedTable, 
newResolvedSchema);
+        ctx.getCatalogManager()
+                .alterTable(newResolvedMaterializedTable, tableChanges, 
tableIdentifier, false);
+
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", tableIdentifier);
+        return OperationUtils.formatWithChildren(
+                "ALTER MATERIALIZED TABLE",
+                params,
+                Collections.singletonList(queryOperation),
+                Operation::asSummaryString);
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+
+        if (oldSchema.getColumns().size() > newSchema.getColumns().size()) {
+            throw new ValidationException(

Review Comment:
   The error message here should emphasize that deleting columns is not 
supported when altering the Query.
   And print the original column size and the new column size, how many columns 
we have deleted.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",

Review Comment:
   ```suggestion
                               "Only materialized table support modify 
definition query.",
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableResultImpl;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+import org.apache.flink.table.operations.QueryOperation;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE;
+
+/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */
+@Internal
+public class AlterMaterializedTableAsQueryOperation extends 
AlterMaterializedTableOperation {
+
+    protected final ObjectIdentifier tableIdentifier;
+
+    private final QueryOperation queryOperation;
+
+    public AlterMaterializedTableAsQueryOperation(
+            ObjectIdentifier tableIdentifier, QueryOperation queryOperation) {
+        super(tableIdentifier);
+        this.tableIdentifier = tableIdentifier;
+        this.queryOperation = queryOperation;
+    }
+
+    public QueryOperation getQueryOperation() {
+        return queryOperation;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                
ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable();
+        if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) {
+            throw new ValidationException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operation.",
+                            tableIdentifier));
+        }
+
+        ResolvedCatalogMaterializedTable oldResolvedMaterializedTable =
+                (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
+
+        // validate new schema and derived origin primary key and watermark 
spec
+        ResolvedSchema resolvedQuerySchema = 
queryOperation.getResolvedSchema();
+        ResolvedSchema oldResolvedSchema = 
oldResolvedMaterializedTable.getResolvedSchema();
+        List<TableChange> tableChanges =
+                validateAndExtractNewColumns(oldResolvedSchema, 
resolvedQuerySchema).stream()
+                        .map(TableChange::add)
+                        .collect(Collectors.toList());
+        ResolvedSchema newResolvedSchema =
+                new ResolvedSchema(
+                        resolvedQuerySchema.getColumns(),
+                        oldResolvedSchema.getWatermarkSpecs(),
+                        oldResolvedSchema.getPrimaryKey().orElse(null));
+        Schema newSchema = 
Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build();
+
+        // update schema and definition query
+        String definitionQuery = queryOperation.asSerializableString();
+        CatalogMaterializedTable catalogMaterializedTable =
+                oldResolvedMaterializedTable.getOrigin().copy(newSchema, 
definitionQuery);
+
+        ResolvedCatalogMaterializedTable newResolvedMaterializedTable =
+                new ResolvedCatalogMaterializedTable(catalogMaterializedTable, 
newResolvedSchema);
+        ctx.getCatalogManager()
+                .alterTable(newResolvedMaterializedTable, tableChanges, 
tableIdentifier, false);
+
+        return TableResultImpl.TABLE_RESULT_OK;
+    }
+
+    @Override
+    public String asSummaryString() {
+        Map<String, Object> params = new LinkedHashMap<>();
+        params.put("identifier", tableIdentifier);
+        return OperationUtils.formatWithChildren(
+                "ALTER MATERIALIZED TABLE",
+                params,
+                Collections.singletonList(queryOperation),
+                Operation::asSummaryString);
+    }
+
+    private List<Column> validateAndExtractNewColumns(
+            ResolvedSchema oldSchema, ResolvedSchema newSchema) {
+        List<Column> newAddedColumns = new ArrayList<>();
+
+        if (oldSchema.getColumns().size() > newSchema.getColumns().size()) {
+            throw new ValidationException(
+                    "Cannot alter table. The new schema has fewer columns than 
the original schema.");
+        }
+
+        for (int i = 0; i < oldSchema.getColumns().size(); i++) {
+            Column oldColumn = oldSchema.getColumns().get(i);
+            Column newColumn = newSchema.getColumns().get(i);
+            if (!oldColumn.equals(newColumn)) {
+                throw new ValidationException(
+                        String.format(

Review Comment:
   The error message here should highlight that when modifying a query, 
renaming columns, moving columns, or adding new columns in the middle position 
is not supported; columns can only be appended at the end. Additionally, the 
original column info and the new column info should both be included in the 
error message for the corresponding positions.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                suspendContinuousRefreshJob(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            } else {
+                suspendRefreshWorkflow(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            }
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema
+        operationExecutor.callExecutableOperation(handle, op);
+        ResolvedCatalogMaterializedTable updatedMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } else {
+                // resume workflow
+                resumeRefreshWorkflow(

Review Comment:
   For full mode, we don't need to suspend and resume the workflow. If the 
insert overwrite query can be compiled sucessfully, then we alter the 
materialized table definition query directly.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);

Review Comment:
   We should validate the modified query is legal first, then compile the 
`insert into` or `insert overwrite` query in the corresponding refresh mode. 
Only these two steps is passed, then we can execute the following operation. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +808,78 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 1. suspend the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                suspendContinuousRefreshJob(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            } else {
+                suspendRefreshWorkflow(
+                        operationExecutor, handle, tableIdentifier, 
materializedTable);
+            }
+        }
+
+        // 2. replace query definition and resume the materialized table
+        // alter materialized table schema
+        operationExecutor.callExecutableOperation(handle, op);
+        ResolvedCatalogMaterializedTable updatedMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        // 3. resume the materialized table
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                != materializedTable.getRefreshStatus()) {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } else {
+                // resume workflow
+                resumeRefreshWorkflow(
+                        operationExecutor,
+                        handle,
+                        tableIdentifier,
+                        updatedMaterializedTable,
+                        Collections.emptyMap());
+            }
+        } else {
+            if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
+                    == materializedTable.getRefreshMode()) {
+                // we should reset the savepoint path after the alter operation
+                ContinuousRefreshHandler refreshHandler =
+                        deserializeContinuousHandler(
+                                
materializedTable.getSerializedRefreshHandler());
+                ContinuousRefreshHandler resetHandler =
+                        new ContinuousRefreshHandler(
+                                refreshHandler.getExecutionTarget(), 
refreshHandler.getJobId());
+                updateRefreshHandler(
+                        operationExecutor,
+                        handle,
+                        tableIdentifier,
+                        updatedMaterializedTable,
+                        updatedMaterializedTable.getRefreshStatus(),
+                        resetHandler.asSummaryString(),
+                        serializeContinuousHandler(resetHandler));
+            }
+        }
+

Review Comment:
   We should execute the alter materialized table operation here after all pre 
works are success.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1027,6 +1027,351 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        String dataId = 
TestValuesTableFactory.registerData(Collections.emptyList());
+        String sourceDdl =
+                String.format(
+                        "CREATE TABLE IF NOT EXISTS my_new_source (\n"
+                                + "  order_id BIGINT,\n"
+                                + "  user_id BIGINT,\n"
+                                + "  shop_id BIGINT,\n"
+                                + "  order_amount DOUBLE,\n"
+                                + "  order_created_at STRING\n"
+                                + ")\n"
+                                + "WITH (\n"
+                                + "  'connector' = 'values',\n"
+                                + "  'bounded' = 'true',\n"
+                                + "  'data-id' = '%s'\n"
+                                + ")",
+                        dataId);
+        OperationHandle sourceHandle =
+                service.executeStatement(sessionHandle, sourceDdl, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, sourceHandle);
+
+        // Alter materialized table as query in full mode
+        String alterMaterializedTableAsQueryDDL =
+                "ALTER MATERIALIZED TABLE users_shops"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  COUNT(order_id) AS order_cnt,\n"
+                        + "  SUM(order_amount) AS order_amount_sum\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, order_created_at AS 
ds, order_id, order_amount FROM my_new_source"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle alterMaterializedTableAsQueryHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableAsQueryDDL, -1, 
new Configuration());
+
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableAsQueryHandle);
+
+        // verify the altered materialized table
+        ResolvedCatalogMaterializedTable newTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        
assertThat(oldTable.getResolvedSchema()).isNotEqualTo(newTable.getResolvedSchema());

Review Comment:
   Can we verify the schema diff?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -1027,6 +1027,351 @@ void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E
                                         fileSystemCatalogName,
                                         TEST_DEFAULT_DATABASE,
                                         "users_shops")));
+
+        dropMaterializedTable(
+                ObjectIdentifier.of(fileSystemCatalogName, 
TEST_DEFAULT_DATABASE, "users_shops"));
+    }
+
+    @Test
+    void testAlterMaterializedTableAsQueryInFullMode() throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable oldTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        String dataId = 
TestValuesTableFactory.registerData(Collections.emptyList());
+        String sourceDdl =
+                String.format(
+                        "CREATE TABLE IF NOT EXISTS my_new_source (\n"

Review Comment:
   Do we need to create a new source table?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to