This is an automated email from the ASF dual-hosted git repository. morrysnow pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new c5d4b1173d0 branch-2.1: [enhance](mtmv)Mtmv support audit log #41499 (#47923) c5d4b1173d0 is described below commit c5d4b1173d05fa00b0df1438f012f37606576c03 Author: zhangdong <zhangd...@selectdb.com> AuthorDate: Thu Feb 20 14:01:54 2025 +0800 branch-2.1: [enhance](mtmv)Mtmv support audit log #41499 (#47923) pick from master #41499 --- .../apache/doris/job/extensions/mtmv/MTMVTask.java | 43 +++++++++++++++------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java index 4cd4fd2b586..da106eef2cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java @@ -49,6 +49,7 @@ import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.glue.LogicalPlanAdapter; import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand; import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo; +import org.apache.doris.qe.AuditLogHelper; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; import org.apache.doris.qe.StmtExecutor; @@ -215,7 +216,7 @@ public class MTMVTask extends AbstractTask { // need get names before exec Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil .generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames); - exec(ctx, execPartitionNames, tableWithPartKey); + exec(execPartitionNames, tableWithPartKey); completedPartitions.addAll(execPartitionNames); partitionSnapshots.putAll(execPartitionSnapshots); } @@ -230,10 +231,10 @@ public class MTMVTask extends AbstractTask { } } - private void exec(ConnectContext ctx, Set<String> refreshPartitionNames, + private void exec(Set<String> refreshPartitionNames, Map<TableIf, String> tableWithPartKey) throws Exception { - Objects.requireNonNull(ctx, "ctx should not be null"); + ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv); StatementContext statementContext = new StatementContext(); for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) { statementContext.setSnapshot(entry.getKey(), entry.getValue()); @@ -245,20 +246,34 @@ public class MTMVTask extends AbstractTask { UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE ? refreshPartitionNames : Sets.newHashSet(), tableWithPartKey); - executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); - ctx.setExecutor(executor); - ctx.setQueryId(queryId); - ctx.getState().setNereids(true); - command.run(ctx, executor); - if (getStatus() == TaskStatus.CANCELED) { - // Throwing an exception to interrupt subsequent partition update tasks - throw new JobException("task is CANCELED"); - } - if (ctx.getState().getStateType() != MysqlStateType.OK) { - throw new JobException(ctx.getState().getErrorMessage()); + try { + executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext())); + ctx.setExecutor(executor); + ctx.setQueryId(queryId); + ctx.getState().setNereids(true); + command.run(ctx, executor); + if (getStatus() == TaskStatus.CANCELED) { + // Throwing an exception to interrupt subsequent partition update tasks + throw new JobException("task is CANCELED"); + } + if (ctx.getState().getStateType() != MysqlStateType.OK) { + throw new JobException(ctx.getState().getErrorMessage()); + } + } finally { + if (executor != null) { + AuditLogHelper.logAuditLog(ctx, getDummyStmt(refreshPartitionNames), + executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog(), true); + } } } + private String getDummyStmt(Set<String> refreshPartitionNames) { + return String.format( + "Asynchronous materialized view refresh task, mvName: %s," + + "taskId: %s, partitions refreshed by this insert overwrite: %s", + mtmv.getName(), super.getTaskId(), refreshPartitionNames); + } + @Override public synchronized void onFail() throws JobException { LOG.info("mtmv task onFail, taskId: {}", super.getTaskId()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org