This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new c5d4b1173d0 branch-2.1: [enhance](mtmv)Mtmv support audit log #41499 
(#47923)
c5d4b1173d0 is described below

commit c5d4b1173d05fa00b0df1438f012f37606576c03
Author: zhangdong <zhangd...@selectdb.com>
AuthorDate: Thu Feb 20 14:01:54 2025 +0800

    branch-2.1: [enhance](mtmv)Mtmv support audit log #41499 (#47923)
    
    pick from master #41499
---
 .../apache/doris/job/extensions/mtmv/MTMVTask.java | 43 +++++++++++++++-------
 1 file changed, 29 insertions(+), 14 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
index 4cd4fd2b586..da106eef2cb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java
@@ -49,6 +49,7 @@ import org.apache.doris.nereids.StatementContext;
 import org.apache.doris.nereids.glue.LogicalPlanAdapter;
 import 
org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
+import org.apache.doris.qe.AuditLogHelper;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.QueryState.MysqlStateType;
 import org.apache.doris.qe.StmtExecutor;
@@ -215,7 +216,7 @@ public class MTMVTask extends AbstractTask {
                 // need get names before exec
                 Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots = MTMVPartitionUtil
                         .generatePartitionSnapshots(context, 
relation.getBaseTablesOneLevel(), execPartitionNames);
-                exec(ctx, execPartitionNames, tableWithPartKey);
+                exec(execPartitionNames, tableWithPartKey);
                 completedPartitions.addAll(execPartitionNames);
                 partitionSnapshots.putAll(execPartitionSnapshots);
             }
@@ -230,10 +231,10 @@ public class MTMVTask extends AbstractTask {
         }
     }
 
-    private void exec(ConnectContext ctx, Set<String> refreshPartitionNames,
+    private void exec(Set<String> refreshPartitionNames,
             Map<TableIf, String> tableWithPartKey)
             throws Exception {
-        Objects.requireNonNull(ctx, "ctx should not be null");
+        ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
         StatementContext statementContext = new StatementContext();
         for (Entry<MvccTableInfo, MvccSnapshot> entry : snapshots.entrySet()) {
             statementContext.setSnapshot(entry.getKey(), entry.getValue());
@@ -245,20 +246,34 @@ public class MTMVTask extends AbstractTask {
         UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
                 .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() != 
MTMVPartitionType.SELF_MANAGE
                         ? refreshPartitionNames : Sets.newHashSet(), 
tableWithPartKey);
-        executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
-        ctx.setExecutor(executor);
-        ctx.setQueryId(queryId);
-        ctx.getState().setNereids(true);
-        command.run(ctx, executor);
-        if (getStatus() == TaskStatus.CANCELED) {
-            // Throwing an exception to interrupt subsequent partition update 
tasks
-            throw new JobException("task is CANCELED");
-        }
-        if (ctx.getState().getStateType() != MysqlStateType.OK) {
-            throw new JobException(ctx.getState().getErrorMessage());
+        try {
+            executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
+            ctx.setExecutor(executor);
+            ctx.setQueryId(queryId);
+            ctx.getState().setNereids(true);
+            command.run(ctx, executor);
+            if (getStatus() == TaskStatus.CANCELED) {
+                // Throwing an exception to interrupt subsequent partition 
update tasks
+                throw new JobException("task is CANCELED");
+            }
+            if (ctx.getState().getStateType() != MysqlStateType.OK) {
+                throw new JobException(ctx.getState().getErrorMessage());
+            }
+        } finally {
+            if (executor != null) {
+                AuditLogHelper.logAuditLog(ctx, 
getDummyStmt(refreshPartitionNames),
+                        executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(), true);
+            }
         }
     }
 
+    private String getDummyStmt(Set<String> refreshPartitionNames) {
+        return String.format(
+                "Asynchronous materialized view refresh task, mvName: %s,"
+                        + "taskId: %s, partitions refreshed by this insert 
overwrite: %s",
+                mtmv.getName(), super.getTaskId(), refreshPartitionNames);
+    }
+
     @Override
     public synchronized void onFail() throws JobException {
         LOG.info("mtmv task onFail, taskId: {}", super.getTaskId());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to