lsyldliu commented on code in PR #25880: URL: https://github.com/apache/flink/pull/25880#discussion_r1900789652
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java: ########## @@ -97,6 +97,8 @@ CatalogMaterializedTable copy( String refreshHandlerDescription, byte[] serializedRefreshHandler); + CatalogMaterializedTable copy(Schema schema, String definitionQuery); + Review Comment: This is public interface, we can introduce this method randomly, Some connector may implement this interface such as paimon, this will result int incompatible behavior. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); + List<TableChange> tableChanges = + validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream() Review Comment: Regarding the TableChange overall design: 1. I think we should introduce a new `CatalogTableChange` interface extends TableChange, which represents the modification of CatalogTable, TableChange represents the modification of CatalogBaseTable, MaterializedTableChange represents the modification of CatalogMaterializedTable. And maybe in the future we will introduce CatalogViewChange. 2. The common part of modify CatalogTable and CatalogMaterializedTable such as AddColumn, it should both implements the CatalogTableChange and MaterializedTableChange. Otherwise, the modification operation only need to implement one interface of these two. 3. About alter materialized table query, we need to introduce a new MaterializedTableChange called `ModifyDefinitionQuery` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); Review Comment: Do we can use the SchemaResolver to resolve the schema here? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); + List<TableChange> tableChanges = + validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream() + .map(TableChange::add) + .collect(Collectors.toList()); + ResolvedSchema newResolvedSchema = + new ResolvedSchema( + resolvedQuerySchema.getColumns(), + oldResolvedSchema.getWatermarkSpecs(), + oldResolvedSchema.getPrimaryKey().orElse(null)); + Schema newSchema = Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(); + + // update schema and definition query + String definitionQuery = queryOperation.asSerializableString(); + CatalogMaterializedTable catalogMaterializedTable = + oldResolvedMaterializedTable.getOrigin().copy(newSchema, definitionQuery); + + ResolvedCatalogMaterializedTable newResolvedMaterializedTable = + new ResolvedCatalogMaterializedTable(catalogMaterializedTable, newResolvedSchema); + ctx.getCatalogManager() + .alterTable(newResolvedMaterializedTable, tableChanges, tableIdentifier, false); Review Comment: We should call the `AlterMaterializedTableChangeOperation` to complete the modify operation. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,78 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, materializedTable); + } else { + suspendRefreshWorkflow( + operationExecutor, handle, tableIdentifier, materializedTable); + } + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema + operationExecutor.callExecutableOperation(handle, op); + ResolvedCatalogMaterializedTable updatedMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { Review Comment: Why here is judge `CatalogMaterializedTable.RefreshStatus.SUSPENDED != materializedTable.getRefreshStatus()`? I think only the RefreshStatus is suspend, then we need to resume it? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java: ########## @@ -391,6 +392,17 @@ void testAlterMaterializedTableResume() { .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); } + @Test + void testAlterMaterializedTableAsQuery() { Review Comment: We should add more tests to cover the case I commented in code review. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,78 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, materializedTable); + } else { + suspendRefreshWorkflow( + operationExecutor, handle, tableIdentifier, materializedTable); + } + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema + operationExecutor.callExecutableOperation(handle, op); Review Comment: We shouldn't alter the materialized table metadata firstly, only the refresh job or workflow resume sucessfully then we can alter the materialized table definition query. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,78 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, materializedTable); + } else { + suspendRefreshWorkflow( + operationExecutor, handle, tableIdentifier, materializedTable); + } + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema + operationExecutor.callExecutableOperation(handle, op); + ResolvedCatalogMaterializedTable updatedMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } else { + // resume workflow + resumeRefreshWorkflow( + operationExecutor, + handle, + tableIdentifier, + updatedMaterializedTable, + Collections.emptyMap()); + } + } else { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS Review Comment: I think if we failed to start the continuous refresh job use new definition query, we should resume the original refresh job and the throw the exception. So this action is not required? ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); + List<TableChange> tableChanges = + validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream() + .map(TableChange::add) + .collect(Collectors.toList()); + ResolvedSchema newResolvedSchema = + new ResolvedSchema( + resolvedQuerySchema.getColumns(), + oldResolvedSchema.getWatermarkSpecs(), + oldResolvedSchema.getPrimaryKey().orElse(null)); + Schema newSchema = Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(); + + // update schema and definition query + String definitionQuery = queryOperation.asSerializableString(); + CatalogMaterializedTable catalogMaterializedTable = + oldResolvedMaterializedTable.getOrigin().copy(newSchema, definitionQuery); + + ResolvedCatalogMaterializedTable newResolvedMaterializedTable = + new ResolvedCatalogMaterializedTable(catalogMaterializedTable, newResolvedSchema); + ctx.getCatalogManager() + .alterTable(newResolvedMaterializedTable, tableChanges, tableIdentifier, false); + + return TableResultImpl.TABLE_RESULT_OK; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", tableIdentifier); + return OperationUtils.formatWithChildren( + "ALTER MATERIALIZED TABLE", + params, + Collections.singletonList(queryOperation), Review Comment: It would be better we also print he definition query. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); + List<TableChange> tableChanges = + validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream() + .map(TableChange::add) + .collect(Collectors.toList()); + ResolvedSchema newResolvedSchema = + new ResolvedSchema( + resolvedQuerySchema.getColumns(), + oldResolvedSchema.getWatermarkSpecs(), + oldResolvedSchema.getPrimaryKey().orElse(null)); + Schema newSchema = Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(); + + // update schema and definition query + String definitionQuery = queryOperation.asSerializableString(); Review Comment: As discuss offline, the definition query shouldn't come from QueryOperation. We should get it from SqlNode directly. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); + List<TableChange> tableChanges = + validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream() + .map(TableChange::add) + .collect(Collectors.toList()); + ResolvedSchema newResolvedSchema = + new ResolvedSchema( + resolvedQuerySchema.getColumns(), + oldResolvedSchema.getWatermarkSpecs(), + oldResolvedSchema.getPrimaryKey().orElse(null)); + Schema newSchema = Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(); + + // update schema and definition query + String definitionQuery = queryOperation.asSerializableString(); + CatalogMaterializedTable catalogMaterializedTable = + oldResolvedMaterializedTable.getOrigin().copy(newSchema, definitionQuery); + + ResolvedCatalogMaterializedTable newResolvedMaterializedTable = + new ResolvedCatalogMaterializedTable(catalogMaterializedTable, newResolvedSchema); + ctx.getCatalogManager() + .alterTable(newResolvedMaterializedTable, tableChanges, tableIdentifier, false); + + return TableResultImpl.TABLE_RESULT_OK; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", tableIdentifier); + return OperationUtils.formatWithChildren( + "ALTER MATERIALIZED TABLE", + params, + Collections.singletonList(queryOperation), + Operation::asSummaryString); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + + if (oldSchema.getColumns().size() > newSchema.getColumns().size()) { + throw new ValidationException( Review Comment: The error message here should emphasize that deleting columns is not supported when altering the Query. And print the original column size and the new column size, how many columns we have deleted. ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", Review Comment: ```suggestion "Only materialized table support modify definition query.", ``` ########## flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableAsQueryOperation.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.internal.TableResultImpl; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.OperationUtils; +import org.apache.flink.table.operations.QueryOperation; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.table.catalog.CatalogBaseTable.TableKind.MATERIALIZED_TABLE; + +/** Operation to describe an ALTER MATERIALIZED TABLE AS query operation. */ +@Internal +public class AlterMaterializedTableAsQueryOperation extends AlterMaterializedTableOperation { + + protected final ObjectIdentifier tableIdentifier; + + private final QueryOperation queryOperation; + + public AlterMaterializedTableAsQueryOperation( + ObjectIdentifier tableIdentifier, QueryOperation queryOperation) { + super(tableIdentifier); + this.tableIdentifier = tableIdentifier; + this.queryOperation = queryOperation; + } + + public QueryOperation getQueryOperation() { + return queryOperation; + } + + @Override + public TableResultInternal execute(Context ctx) { + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + ctx.getCatalogManager().getTableOrError(tableIdentifier).getResolvedTable(); + if (MATERIALIZED_TABLE != resolvedCatalogBaseTable.getTableKind()) { + throw new ValidationException( + String.format( + "Table %s is not a materialized table, does not support materialized table related operation.", + tableIdentifier)); + } + + ResolvedCatalogMaterializedTable oldResolvedMaterializedTable = + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable; + + // validate new schema and derived origin primary key and watermark spec + ResolvedSchema resolvedQuerySchema = queryOperation.getResolvedSchema(); + ResolvedSchema oldResolvedSchema = oldResolvedMaterializedTable.getResolvedSchema(); + List<TableChange> tableChanges = + validateAndExtractNewColumns(oldResolvedSchema, resolvedQuerySchema).stream() + .map(TableChange::add) + .collect(Collectors.toList()); + ResolvedSchema newResolvedSchema = + new ResolvedSchema( + resolvedQuerySchema.getColumns(), + oldResolvedSchema.getWatermarkSpecs(), + oldResolvedSchema.getPrimaryKey().orElse(null)); + Schema newSchema = Schema.newBuilder().fromResolvedSchema(newResolvedSchema).build(); + + // update schema and definition query + String definitionQuery = queryOperation.asSerializableString(); + CatalogMaterializedTable catalogMaterializedTable = + oldResolvedMaterializedTable.getOrigin().copy(newSchema, definitionQuery); + + ResolvedCatalogMaterializedTable newResolvedMaterializedTable = + new ResolvedCatalogMaterializedTable(catalogMaterializedTable, newResolvedSchema); + ctx.getCatalogManager() + .alterTable(newResolvedMaterializedTable, tableChanges, tableIdentifier, false); + + return TableResultImpl.TABLE_RESULT_OK; + } + + @Override + public String asSummaryString() { + Map<String, Object> params = new LinkedHashMap<>(); + params.put("identifier", tableIdentifier); + return OperationUtils.formatWithChildren( + "ALTER MATERIALIZED TABLE", + params, + Collections.singletonList(queryOperation), + Operation::asSummaryString); + } + + private List<Column> validateAndExtractNewColumns( + ResolvedSchema oldSchema, ResolvedSchema newSchema) { + List<Column> newAddedColumns = new ArrayList<>(); + + if (oldSchema.getColumns().size() > newSchema.getColumns().size()) { + throw new ValidationException( + "Cannot alter table. The new schema has fewer columns than the original schema."); + } + + for (int i = 0; i < oldSchema.getColumns().size(); i++) { + Column oldColumn = oldSchema.getColumns().get(i); + Column newColumn = newSchema.getColumns().get(i); + if (!oldColumn.equals(newColumn)) { + throw new ValidationException( + String.format( Review Comment: The error message here should highlight that when modifying a query, renaming columns, moving columns, or adding new columns in the middle position is not supported; columns can only be appended at the end. Additionally, the original column info and the new column info should both be included in the error message for the corresponding positions. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,78 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, materializedTable); + } else { + suspendRefreshWorkflow( + operationExecutor, handle, tableIdentifier, materializedTable); + } + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema + operationExecutor.callExecutableOperation(handle, op); + ResolvedCatalogMaterializedTable updatedMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } else { + // resume workflow + resumeRefreshWorkflow( Review Comment: For full mode, we don't need to suspend and resume the workflow. If the insert overwrite query can be compiled sucessfully, then we alter the materialized table definition query directly. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,78 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); Review Comment: We should validate the modified query is legal first, then compile the `insert into` or `insert overwrite` query in the corresponding refresh mode. Only these two steps is passed, then we can execute the following operation. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +808,78 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable materializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 1. suspend the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, materializedTable); + } else { + suspendRefreshWorkflow( + operationExecutor, handle, tableIdentifier, materializedTable); + } + } + + // 2. replace query definition and resume the materialized table + // alter materialized table schema + operationExecutor.callExecutableOperation(handle, op); + ResolvedCatalogMaterializedTable updatedMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + // 3. resume the materialized table + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + != materializedTable.getRefreshStatus()) { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } else { + // resume workflow + resumeRefreshWorkflow( + operationExecutor, + handle, + tableIdentifier, + updatedMaterializedTable, + Collections.emptyMap()); + } + } else { + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS + == materializedTable.getRefreshMode()) { + // we should reset the savepoint path after the alter operation + ContinuousRefreshHandler refreshHandler = + deserializeContinuousHandler( + materializedTable.getSerializedRefreshHandler()); + ContinuousRefreshHandler resetHandler = + new ContinuousRefreshHandler( + refreshHandler.getExecutionTarget(), refreshHandler.getJobId()); + updateRefreshHandler( + operationExecutor, + handle, + tableIdentifier, + updatedMaterializedTable, + updatedMaterializedTable.getRefreshStatus(), + resetHandler.asSummaryString(), + serializeContinuousHandler(resetHandler)); + } + } + Review Comment: We should execute the alter materialized table operation here after all pre works are success. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1027,6 +1027,351 @@ void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + String dataId = TestValuesTableFactory.registerData(Collections.emptyList()); + String sourceDdl = + String.format( + "CREATE TABLE IF NOT EXISTS my_new_source (\n" + + " order_id BIGINT,\n" + + " user_id BIGINT,\n" + + " shop_id BIGINT,\n" + + " order_amount DOUBLE,\n" + + " order_created_at STRING\n" + + ")\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'bounded' = 'true',\n" + + " 'data-id' = '%s'\n" + + ")", + dataId); + OperationHandle sourceHandle = + service.executeStatement(sessionHandle, sourceDdl, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, sourceHandle); + + // Alter materialized table as query in full mode + String alterMaterializedTableAsQueryDDL = + "ALTER MATERIALIZED TABLE users_shops" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " COUNT(order_id) AS order_cnt,\n" + + " SUM(order_amount) AS order_amount_sum\n" + + " FROM (\n" + + " SELECT user_id, shop_id, order_created_at AS ds, order_id, order_amount FROM my_new_source" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle alterMaterializedTableAsQueryHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableAsQueryDDL, -1, new Configuration()); + + awaitOperationTermination(service, sessionHandle, alterMaterializedTableAsQueryHandle); + + // verify the altered materialized table + ResolvedCatalogMaterializedTable newTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(oldTable.getResolvedSchema()).isNotEqualTo(newTable.getResolvedSchema()); Review Comment: Can we verify the schema diff? ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ########## @@ -1027,6 +1027,351 @@ void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws E fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops"))); + + dropMaterializedTable( + ObjectIdentifier.of(fileSystemCatalogName, TEST_DEFAULT_DATABASE, "users_shops")); + } + + @Test + void testAlterMaterializedTableAsQueryInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable oldTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + String dataId = TestValuesTableFactory.registerData(Collections.emptyList()); + String sourceDdl = + String.format( + "CREATE TABLE IF NOT EXISTS my_new_source (\n" Review Comment: Do we need to create a new source table? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org