lsyldliu commented on code in PR #25880:
URL: https://github.com/apache/flink/pull/25880#discussion_r1907004614


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +809,160 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // directly apply the alter operation
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == oldMaterializedTable.getRefreshStatus()) {
+            // 1. suspend the materialized table
+            CatalogMaterializedTable suspendMaterializedTable =
+                    suspendContinuousRefreshJob(
+                            operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+
+            // 2. alter materialized table schema & query definition
+            CatalogMaterializedTable updatedMaterializedTable =
+                    op.getNewMaterializedTable()
+                            .copy(
+                                    
suspendMaterializedTable.getRefreshStatus(),
+                                    suspendMaterializedTable
+                                            .getRefreshHandlerDescription()
+                                            .orElse(null),
+                                    
suspendMaterializedTable.getSerializedRefreshHandler());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
updatedMaterializedTable);
+            operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+
+            // 3. resume the materialized table
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // Roll back the changes to the materialized table and restore 
the continuous
+                // refresh job
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter materialized table as query 
operation.",
+                        tableIdentifier,
+                        e);
+
+                AlterMaterializedTableChangeOperation rollbackChangeOperation =
+                        generateRollbackAlterMaterializedTableOperation(
+                                suspendMaterializedTable, 
alterMaterializedTableChangeOperation);
+                operationExecutor.callExecutableOperation(handle, 
rollbackChangeOperation);
+
+                ContinuousRefreshHandler continuousRefreshHandler =
+                        deserializeContinuousHandler(
+                                
suspendMaterializedTable.getSerializedRefreshHandler());
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        suspendMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        continuousRefreshHandler.getRestorePath());
+
+                throw new SqlExecutionException(
+                        String.format(
+                                "Failed to alter materialized table as query 
operation for materialized table %s.",

Review Comment:
   ```suggestion
                                   "Failed to restore the continuous refresh 
job using original query {} when altering materialized table {} select query 
failure.",
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -804,6 +809,160 @@ protected static String getRefreshStatement(
         return insertStatement.toString();
     }
 
+    private ResultFetcher callAlterMaterializedTableAsQueryOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableAsQueryOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable oldMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (CatalogMaterializedTable.RefreshMode.FULL == 
oldMaterializedTable.getRefreshMode()) {
+            // directly apply the alter operation
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
op.getNewMaterializedTable());
+            return operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        }
+
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == oldMaterializedTable.getRefreshStatus()) {
+            // 1. suspend the materialized table
+            CatalogMaterializedTable suspendMaterializedTable =
+                    suspendContinuousRefreshJob(
+                            operationExecutor, handle, tableIdentifier, 
oldMaterializedTable);
+
+            // 2. alter materialized table schema & query definition
+            CatalogMaterializedTable updatedMaterializedTable =
+                    op.getNewMaterializedTable()
+                            .copy(
+                                    
suspendMaterializedTable.getRefreshStatus(),
+                                    suspendMaterializedTable
+                                            .getRefreshHandlerDescription()
+                                            .orElse(null),
+                                    
suspendMaterializedTable.getSerializedRefreshHandler());
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            tableIdentifier, op.getTableChanges(), 
updatedMaterializedTable);
+            operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+
+            // 3. resume the materialized table
+            try {
+                executeContinuousRefreshJob(
+                        operationExecutor,
+                        handle,
+                        updatedMaterializedTable,
+                        tableIdentifier,
+                        Collections.emptyMap(),
+                        Optional.empty());
+            } catch (Exception e) {
+                // Roll back the changes to the materialized table and restore 
the continuous
+                // refresh job
+                LOG.warn(
+                        "Failed to resume the continuous refresh job for 
materialized table {}, rollback the alter materialized table as query 
operation.",

Review Comment:
   ```suggestion
                           "Failed to start the continuous refresh job for 
materialized table {} using new query {}, rollback to origin query {}.",
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to