This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9a9463d5953 [fix](cloud) overwrite job statistic without data quality 
check when update cloud progress (#39790)
9a9463d5953 is described below

commit 9a9463d5953fc9af3697479d6c3d733fba59b555
Author: hui lai <1353307...@qq.com>
AuthorDate: Tue Aug 27 17:36:57 2024 +0800

    [fix](cloud) overwrite job statistic without data quality check when update 
cloud progress (#39790)
---
 .../apache/doris/load/routineload/KafkaRoutineLoadJob.java   |  8 +++++++-
 .../org/apache/doris/load/routineload/RoutineLoadJob.java    | 12 ++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 292f87f8a22..abd1800a19d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -281,7 +281,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         }
 
         RLTaskTxnCommitAttachment commitAttach = new 
RLTaskTxnCommitAttachment(response.getCommitAttach());
-        updateProgress(commitAttach);
+        updateCloudProgress(commitAttach);
     }
 
     @Override
@@ -346,6 +346,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
         updateProgressAndOffsetsCache(attachment);
     }
 
+    @Override
+    protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
+        super.updateCloudProgress(attachment);
+        updateProgressAndOffsetsCache(attachment);
+    }
+
     @Override
     protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo 
routineLoadTaskInfo) {
         KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index de1bffe1d56..2b8cbbd81ac 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -866,6 +866,18 @@ public abstract class RoutineLoadJob
                 attachment.getReceivedBytes(), false /* not replay */);
     }
 
+    protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) {
+        // In the cloud mode, the reason for needing to overwrite jobStatistic 
is that
+        // pulling the progress of meta service is equivalent to a replay 
operation of edit log,
+        // but this method will be called whenever scheduled by 
RoutineLoadScheduler,
+        // and accumulation will result in incorrect jobStatistic information.
+        this.jobStatistic.totalRows = attachment.getTotalRows();
+        this.jobStatistic.errorRows = attachment.getFilteredRows();
+        this.jobStatistic.unselectedRows = attachment.getUnselectedRows();
+        this.jobStatistic.receivedBytes = attachment.getReceivedBytes();
+        this.jobStatistic.totalTaskExcutionTimeMs = System.currentTimeMillis() 
- createTimestamp;
+    }
+
     private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, 
long unselectedRows, long receivedBytes,
                                  boolean isReplay) throws UserException {
         this.jobStatistic.totalRows += numOfTotalRows;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to