morrySnow commented on code in PR #49041: URL: https://github.com/apache/doris/pull/49041#discussion_r2043512021
########## fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java: ########## @@ -809,16 +818,26 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne String newTblName = newTbl.getName(); // drop origin table and new table db.unregisterTable(oldTblName); + if (origTable.getType() == TableType.MATERIALIZED_VIEW) { + Env.getCurrentEnv().getMtmvService().deregisterMTMV((MTMV) origTable); Review Comment: what's the diff between unregister and deregister? ########## fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java: ########## @@ -247,6 +252,34 @@ public void run() throws JobException { } } + private void checkColumnTypeIfChange(MTMV mtmv, ConnectContext ctx) throws JobException { + List<ColumnDefinition> derivedColumnsDefinition = MTMVPlanUtil.generateColumnsBySql(mtmv.getQuerySql(), ctx, + mtmv.getMvPartitionInfo().getPartitionCol(), + mtmv.getDistributionColumnNames(), null, mtmv.getTableProperty().getProperties()); + List<Column> derivedColumns = derivedColumnsDefinition.stream() + .map(ColumnDefinition::translateToCatalogStyle) + .collect(Collectors.toList()); + List<Column> currentColumns = mtmv.getBaseSchemaColumns(); + if (derivedColumns.size() != currentColumns.size()) { + throw new JobException("column length not equals, please check columns of base table if changed"); Review Comment: print original and current column size. please use LLM to optimize the error message ########## fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java: ########## @@ -2616,6 +2616,11 @@ public int getBaseSchemaVersion() { return baseIndexMeta.getSchemaVersion(); } + public List<Column> getBaseSchemaColumns() { Review Comment: what's the diff between this function and `getBaseSchema` ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java: ########## @@ -150,4 +173,117 @@ public static Set<TableIf> getBaseTableFromQuery(String querySql, ConnectContext } } } + + public static List<ColumnDefinition> generateColumnsBySql(String querySql, ConnectContext ctx, String partitionCol, + Set<String> distributionColumnNames, List<SimpleColumnDefinition> simpleColumnDefinitions, + Map<String, String> properties) { + List<StatementBase> statements; + try { + statements = new NereidsParser().parseSQL(querySql); + } catch (Exception e) { + throw new ParseException("Nereids parse failed. " + e.getMessage()); + } + StatementBase parsedStmt = statements.get(0); + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + StatementContext original = ctx.getStatementContext(); + try (StatementContext tempCtx = new StatementContext()) { + ctx.setStatementContext(tempCtx); + try { + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + Plan plan = planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN); + return generateColumns(plan, ctx, partitionCol, distributionColumnNames, simpleColumnDefinitions, + properties); + } finally { + ctx.setStatementContext(original); + } + } + } + + public static List<ColumnDefinition> generateColumns(Plan plan, ConnectContext ctx, String partitionCol, + Set<String> distributionColumnNames, List<SimpleColumnDefinition> simpleColumnDefinitions, + Map<String, String> properties) { + List<ColumnDefinition> columns = Lists.newArrayList(); + List<Slot> slots = plan.getOutput(); + if (slots.isEmpty()) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("table should contain at least one column"); + } + if (!CollectionUtils.isEmpty(simpleColumnDefinitions) && simpleColumnDefinitions.size() != slots.size()) { + throw new org.apache.doris.nereids.exceptions.AnalysisException( + "simpleColumnDefinitions size is not equal to the query's"); + } + Set<String> colNames = Sets.newHashSet(); + for (int i = 0; i < slots.size(); i++) { + String colName = CollectionUtils.isEmpty(simpleColumnDefinitions) ? slots.get(i).getName() + : simpleColumnDefinitions.get(i).getName(); + try { + FeNameFormat.checkColumnName(colName); + } catch (org.apache.doris.common.AnalysisException e) { + throw new org.apache.doris.nereids.exceptions.AnalysisException(e.getMessage(), e); + } + if (colNames.contains(colName)) { + throw new org.apache.doris.nereids.exceptions.AnalysisException("repeat cols:" + colName); + } else { + colNames.add(colName); + } + DataType dataType = getDataType(slots.get(i), i, ctx, partitionCol, distributionColumnNames); + // If datatype is AggStateType, AggregateType should be generic, or column definition check will fail + columns.add(new ColumnDefinition( + colName, + dataType, + false, + slots.get(i).getDataType() instanceof AggStateType ? AggregateType.GENERIC : null, + slots.get(i).nullable(), + Optional.empty(), + CollectionUtils.isEmpty(simpleColumnDefinitions) ? null + : simpleColumnDefinitions.get(i).getComment())); + } + // add a hidden column as row store + if (properties != null) { + try { + boolean storeRowColumn = + PropertyAnalyzer.analyzeStoreRowColumn(Maps.newHashMap(properties)); + if (storeRowColumn) { + columns.add(ColumnDefinition.newRowStoreColumnDefinition(null)); + } + } catch (Exception e) { + throw new org.apache.doris.nereids.exceptions.AnalysisException(e.getMessage(), e.getCause()); + } + } + return columns; + } + + private static DataType getDataType(Slot s, int i, ConnectContext ctx, String partitionCol, Review Comment: add ut for this function ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java: ########## @@ -150,4 +173,117 @@ public static Set<TableIf> getBaseTableFromQuery(String querySql, ConnectContext } } } + + public static List<ColumnDefinition> generateColumnsBySql(String querySql, ConnectContext ctx, String partitionCol, Review Comment: add java doc style's comment to explain what this function want to do, the input paramters and the return value ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java: ########## @@ -150,4 +173,117 @@ public static Set<TableIf> getBaseTableFromQuery(String querySql, ConnectContext } } } + + public static List<ColumnDefinition> generateColumnsBySql(String querySql, ConnectContext ctx, String partitionCol, + Set<String> distributionColumnNames, List<SimpleColumnDefinition> simpleColumnDefinitions, + Map<String, String> properties) { + List<StatementBase> statements; + try { + statements = new NereidsParser().parseSQL(querySql); + } catch (Exception e) { + throw new ParseException("Nereids parse failed. " + e.getMessage()); + } + StatementBase parsedStmt = statements.get(0); + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + StatementContext original = ctx.getStatementContext(); + try (StatementContext tempCtx = new StatementContext()) { + ctx.setStatementContext(tempCtx); + try { + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + Plan plan = planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN); + return generateColumns(plan, ctx, partitionCol, distributionColumnNames, simpleColumnDefinitions, + properties); + } finally { + ctx.setStatementContext(original); + } + } + } + + public static List<ColumnDefinition> generateColumns(Plan plan, ConnectContext ctx, String partitionCol, Review Comment: add java doc style's comment to explain what this function want to do, the input paramters and the return value ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java: ########## @@ -207,12 +209,19 @@ public void processEvent(Event event) throws EventException { } } - private boolean canRefresh(MTMV mtmv, TableIf table) { + private boolean canRefresh(MTMV mtmv, TableIf table) { if (mtmv.getExcludedTriggerTables().contains(table.getName())) { LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}", mtmv.getName(), table.getName()); return false; } + // replace/alter base table,not change MTMVRelation, only change MTMV to schema_change, + // Therefore, it may trigger incorrect materialized view refresh + if (mtmv.getStatus().getState().equals(MTMVState.SCHEMA_CHANGE)) { + LOG.info("skip refresh mtmv: {}, because state is SCHEMA_CHANGE, trigger table: {}", + mtmv.getName(), table.getName()); + return false; Review Comment: why we check here, i think it is better to check it when we run refresh task? ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java: ########## @@ -207,12 +209,19 @@ public void processEvent(Event event) throws EventException { } } - private boolean canRefresh(MTMV mtmv, TableIf table) { + private boolean canRefresh(MTMV mtmv, TableIf table) { if (mtmv.getExcludedTriggerTables().contains(table.getName())) { LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}", mtmv.getName(), table.getName()); return false; } + // replace/alter base table,not change MTMVRelation, only change MTMV to schema_change, + // Therefore, it may trigger incorrect materialized view refresh + if (mtmv.getStatus().getState().equals(MTMVState.SCHEMA_CHANGE)) { + LOG.info("skip refresh mtmv: {}, because state is SCHEMA_CHANGE, trigger table: {}", + mtmv.getName(), table.getName()); + return false; + } Review Comment: if we do not refresh when status is `SCHEMA_CHANGE`, how can we refresh mtmv that status be set to `SCHEMA_CHANGE` when upgrade? ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java: ########## @@ -150,4 +173,117 @@ public static Set<TableIf> getBaseTableFromQuery(String querySql, ConnectContext } } } + + public static List<ColumnDefinition> generateColumnsBySql(String querySql, ConnectContext ctx, String partitionCol, Review Comment: add ut for this function ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVPlanUtil.java: ########## @@ -150,4 +173,117 @@ public static Set<TableIf> getBaseTableFromQuery(String querySql, ConnectContext } } } + + public static List<ColumnDefinition> generateColumnsBySql(String querySql, ConnectContext ctx, String partitionCol, + Set<String> distributionColumnNames, List<SimpleColumnDefinition> simpleColumnDefinitions, + Map<String, String> properties) { + List<StatementBase> statements; + try { + statements = new NereidsParser().parseSQL(querySql); + } catch (Exception e) { + throw new ParseException("Nereids parse failed. " + e.getMessage()); + } + StatementBase parsedStmt = statements.get(0); + LogicalPlan logicalPlan = ((LogicalPlanAdapter) parsedStmt).getLogicalPlan(); + StatementContext original = ctx.getStatementContext(); + try (StatementContext tempCtx = new StatementContext()) { + ctx.setStatementContext(tempCtx); + try { + NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext()); + Plan plan = planner.planWithLock(logicalPlan, PhysicalProperties.ANY, ExplainLevel.ANALYZED_PLAN); + return generateColumns(plan, ctx, partitionCol, distributionColumnNames, simpleColumnDefinitions, + properties); + } finally { + ctx.setStatementContext(original); + } + } + } + + public static List<ColumnDefinition> generateColumns(Plan plan, ConnectContext ctx, String partitionCol, Review Comment: add ut for this function ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java: ########## @@ -207,12 +209,19 @@ public void processEvent(Event event) throws EventException { } } - private boolean canRefresh(MTMV mtmv, TableIf table) { + private boolean canRefresh(MTMV mtmv, TableIf table) { if (mtmv.getExcludedTriggerTables().contains(table.getName())) { LOG.info("skip refresh mtmv: {}, because exclude trigger table: {}", mtmv.getName(), table.getName()); return false; } + // replace/alter base table,not change MTMVRelation, only change MTMV to schema_change, + // Therefore, it may trigger incorrect materialized view refresh + if (mtmv.getStatus().getState().equals(MTMVState.SCHEMA_CHANGE)) { + LOG.info("skip refresh mtmv: {}, because state is SCHEMA_CHANGE, trigger table: {}", + mtmv.getName(), table.getName()); + return false; + } return mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT); Review Comment: if this function only handle commit trigger, we should change this function's name to represent this trait. ########## fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java: ########## @@ -247,6 +252,34 @@ public void run() throws JobException { } } + private void checkColumnTypeIfChange(MTMV mtmv, ConnectContext ctx) throws JobException { + List<ColumnDefinition> derivedColumnsDefinition = MTMVPlanUtil.generateColumnsBySql(mtmv.getQuerySql(), ctx, + mtmv.getMvPartitionInfo().getPartitionCol(), + mtmv.getDistributionColumnNames(), null, mtmv.getTableProperty().getProperties()); + List<Column> derivedColumns = derivedColumnsDefinition.stream() + .map(ColumnDefinition::translateToCatalogStyle) + .collect(Collectors.toList()); + List<Column> currentColumns = mtmv.getBaseSchemaColumns(); + if (derivedColumns.size() != currentColumns.size()) { + throw new JobException("column length not equals, please check columns of base table if changed"); + } + for (int i = 0; i < currentColumns.size(); i++) { + if (!isTypeLike(currentColumns.get(i).getType(), derivedColumns.get(i).getType())) { + throw new JobException( + "column type not like, please check columns of base table if changed, columnName: " Review Comment: ```suggestion "column type not same, please check whether columns of base table have changed, column name is ..., original type is ..., current type is ..." ``` ########## fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java: ########## @@ -809,16 +818,26 @@ private void replaceTableInternal(Database db, OlapTable origTable, OlapTable ne String newTblName = newTbl.getName(); // drop origin table and new table db.unregisterTable(oldTblName); + if (origTable.getType() == TableType.MATERIALIZED_VIEW) { Review Comment: could we call deregisterMTMV in db.unregisterTable directly? ########## fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java: ########## @@ -4996,16 +4996,27 @@ public void renameTable(Database db, Table table, String newTableName) throws Dd throw new DdlException("Table name[" + newTableName + "] is already used (in restoring)"); } + if (table.isManagedTable()) { Review Comment: why need check first? could u add some comment to explain it? ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java: ########## @@ -131,11 +133,11 @@ public void dropTable(Table table) { } } - public void alterTable(Table table, String oldTableName) { - Objects.requireNonNull(table); - LOG.info("alterTable, tableName: {}", table.getName()); + public void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace) { + Objects.requireNonNull(oldTableInfo); + LOG.info("alterTable, tableName: {}", oldTableInfo); Review Comment: if u want to log info ,please let the log message more clearly, current message is not good because we could not related it with mtmv ########## fe/fe-core/src/main/java/org/apache/doris/mtmv/MTMVService.java: ########## @@ -131,11 +133,11 @@ public void dropTable(Table table) { } } - public void alterTable(Table table, String oldTableName) { - Objects.requireNonNull(table); - LOG.info("alterTable, tableName: {}", table.getName()); + public void alterTable(BaseTableInfo oldTableInfo, Optional<BaseTableInfo> newTableInfo, boolean isReplace) { + Objects.requireNonNull(oldTableInfo); Review Comment: add error message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org