This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 9a9463d5953 [fix](cloud) overwrite job statistic without data quality check when update cloud progress (#39790) 9a9463d5953 is described below commit 9a9463d5953fc9af3697479d6c3d733fba59b555 Author: hui lai <1353307...@qq.com> AuthorDate: Tue Aug 27 17:36:57 2024 +0800 [fix](cloud) overwrite job statistic without data quality check when update cloud progress (#39790) --- .../apache/doris/load/routineload/KafkaRoutineLoadJob.java | 8 +++++++- .../org/apache/doris/load/routineload/RoutineLoadJob.java | 12 ++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java index 292f87f8a22..abd1800a19d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java @@ -281,7 +281,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { } RLTaskTxnCommitAttachment commitAttach = new RLTaskTxnCommitAttachment(response.getCommitAttach()); - updateProgress(commitAttach); + updateCloudProgress(commitAttach); } @Override @@ -346,6 +346,12 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob { updateProgressAndOffsetsCache(attachment); } + @Override + protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) { + super.updateCloudProgress(attachment); + updateProgressAndOffsetsCache(attachment); + } + @Override protected RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo routineLoadTaskInfo) { KafkaTaskInfo oldKafkaTaskInfo = (KafkaTaskInfo) routineLoadTaskInfo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index de1bffe1d56..2b8cbbd81ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -866,6 +866,18 @@ public abstract class RoutineLoadJob attachment.getReceivedBytes(), false /* not replay */); } + protected void updateCloudProgress(RLTaskTxnCommitAttachment attachment) { + // In the cloud mode, the reason for needing to overwrite jobStatistic is that + // pulling the progress of meta service is equivalent to a replay operation of edit log, + // but this method will be called whenever scheduled by RoutineLoadScheduler, + // and accumulation will result in incorrect jobStatistic information. + this.jobStatistic.totalRows = attachment.getTotalRows(); + this.jobStatistic.errorRows = attachment.getFilteredRows(); + this.jobStatistic.unselectedRows = attachment.getUnselectedRows(); + this.jobStatistic.receivedBytes = attachment.getReceivedBytes(); + this.jobStatistic.totalTaskExcutionTimeMs = System.currentTimeMillis() - createTimestamp; + } + private void updateNumOfData(long numOfTotalRows, long numOfErrorRows, long unselectedRows, long receivedBytes, boolean isReplay) throws UserException { this.jobStatistic.totalRows += numOfTotalRows; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org