This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ed07dc8596a [fix](cloud) fix routine load loss data when fe master 
node restart (#46149)
ed07dc8596a is described below

commit ed07dc8596a70ab7e8aa44f0f222218b3e225663
Author: hui lai <[email protected]>
AuthorDate: Tue Jan 7 22:31:52 2025 +0800

    [fix](cloud) fix routine load loss data when fe master node restart (#46149)
    
    
    In cloud mode, routine load loss data when fe master node restart.
    
    When updating progress, in order to avoid small values covering large
    values, we introduced pr https://github.com/apache/doris/pull/39313, Due
    to the pr that the routine load replays progress metadata by first
    obtaining the set default offset and then pulling metadata from meta
    service to update the local value, if the metadata pulled from meta
    service is not larger than the set default offset, the correct value
    cannot be assigned to memory.
    
    To solve this problem, pulling metadata from meta service when restart,
    determine whether to obtain default offset from Kafka based on the
    pulled value.
---
 .../java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java  | 5 +++++
 .../java/org/apache/doris/load/routineload/RoutineLoadScheduler.java | 5 -----
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 8cb0898eda8..0376cd3f366 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -366,6 +366,11 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
 
     @Override
     protected void unprotectUpdateProgress() throws UserException {
+        // For cloud mode, should update cloud progress from meta service,
+        // then update progress with default offset from Kafka if necessary.
+        if (Config.isCloudMode()) {
+            updateCloudProgress();
+        }
         updateNewPartitionProgress();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 51029c3d18b..023cd239e09 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -18,7 +18,6 @@
 package org.apache.doris.load.routineload;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -79,10 +78,6 @@ public class RoutineLoadScheduler extends MasterDaemon {
             RoutineLoadJob.JobState errorJobState = null;
             UserException userException = null;
             try {
-                if (Config.isCloudMode()) {
-                    routineLoadJob.updateCloudProgress();
-                }
-
                 routineLoadJob.prepare();
                 // judge nums of tasks more than max concurrent tasks of 
cluster
                 int desiredConcurrentTaskNum = 
routineLoadJob.calculateCurrentConcurrentTaskNum();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to