lsyldliu commented on code in PR #25880: URL: https://github.com/apache/flink/pull/25880#discussion_r1908095796
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ########## @@ -804,6 +809,160 @@ protected static String getRefreshStatement( return insertStatement.toString(); } + private ResultFetcher callAlterMaterializedTableAsQueryOperation( + OperationExecutor operationExecutor, + OperationHandle handle, + AlterMaterializedTableAsQueryOperation op) { + ObjectIdentifier tableIdentifier = op.getTableIdentifier(); + CatalogMaterializedTable oldMaterializedTable = + getCatalogMaterializedTable(operationExecutor, tableIdentifier); + + if (CatalogMaterializedTable.RefreshMode.FULL == oldMaterializedTable.getRefreshMode()) { + // directly apply the alter operation + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, op.getTableChanges(), op.getNewMaterializedTable()); + return operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + } + + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == oldMaterializedTable.getRefreshStatus()) { + // 1. suspend the materialized table + CatalogMaterializedTable suspendMaterializedTable = + suspendContinuousRefreshJob( + operationExecutor, handle, tableIdentifier, oldMaterializedTable); + + // 2. alter materialized table schema & query definition + CatalogMaterializedTable updatedMaterializedTable = + op.getNewMaterializedTable() + .copy( + suspendMaterializedTable.getRefreshStatus(), + suspendMaterializedTable + .getRefreshHandlerDescription() + .orElse(null), + suspendMaterializedTable.getSerializedRefreshHandler()); + AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = + new AlterMaterializedTableChangeOperation( + tableIdentifier, op.getTableChanges(), updatedMaterializedTable); + operationExecutor.callExecutableOperation( + handle, alterMaterializedTableChangeOperation); + + // 3. resume the materialized table + try { + executeContinuousRefreshJob( + operationExecutor, + handle, + updatedMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + Optional.empty()); + } catch (Exception e) { + // Roll back the changes to the materialized table and restore the continuous + // refresh job + LOG.warn( + "Failed to resume the continuous refresh job for materialized table {}, rollback the alter materialized table as query operation.", + tableIdentifier, + e); + + AlterMaterializedTableChangeOperation rollbackChangeOperation = + generateRollbackAlterMaterializedTableOperation( + suspendMaterializedTable, alterMaterializedTableChangeOperation); + operationExecutor.callExecutableOperation(handle, rollbackChangeOperation); + + ContinuousRefreshHandler continuousRefreshHandler = + deserializeContinuousHandler( + suspendMaterializedTable.getSerializedRefreshHandler()); + executeContinuousRefreshJob( + operationExecutor, + handle, + suspendMaterializedTable, + tableIdentifier, + Collections.emptyMap(), + continuousRefreshHandler.getRestorePath()); + + throw new SqlExecutionException( + String.format( + "Failed to alter materialized table as query operation for materialized table %s.", Review Comment: BTW, I think we can print the log only before throw the exception, it makes me a mistake. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org