This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 6fe9d5d1af9 [improvement](sync version) fe sync version with be 
(#25718)
6fe9d5d1af9 is described below

commit 6fe9d5d1af9b9d5475a04925aea5f9e365108255
Author: yujun <yu.jun.re...@gmail.com>
AuthorDate: Sun Oct 22 00:47:56 2023 +0800

    [improvement](sync version) fe sync version with be (#25718)
---
 be/src/http/action/debug_point_action.cpp          |  19 ++-
 be/src/util/debug_points.cpp                       |  39 +++--
 be/src/util/debug_points.h                         |  72 ++++++++-
 be/test/util/debug_points_test.cpp                 |  40 +++++
 .../java/org/apache/doris/catalog/Replica.java     |  36 ++++-
 .../apache/doris/catalog/TabletInvertedIndex.java  |  22 ++-
 .../org/apache/doris/clone/TabletSchedCtx.java     |   7 +
 .../apache/doris/common/util/DebugPointUtil.java   |  95 +++++++++--
 .../apache/doris/httpv2/rest/DebugPointAction.java |  25 ++-
 .../java/org/apache/doris/master/MasterImpl.java   |   9 +-
 .../org/apache/doris/master/ReportHandler.java     |  42 +++--
 .../org/apache/doris/clone/RepairVersionTest.java  | 177 +++++++++++++++++++++
 .../doris/common/util/DebugPointUtilTest.java      |  18 +++
 .../apache/doris/utframe/TestWithFeService.java    |   2 +-
 14 files changed, 533 insertions(+), 70 deletions(-)

diff --git a/be/src/http/action/debug_point_action.cpp 
b/be/src/http/action/debug_point_action.cpp
index 08b1e116b2b..04aa38efaa4 100644
--- a/be/src/http/action/debug_point_action.cpp
+++ b/be/src/http/action/debug_point_action.cpp
@@ -21,6 +21,7 @@
 #include "http/http_channel.h"
 #include "http/http_status.h"
 #include "util/debug_points.h"
+#include "util/time.h"
 
 namespace doris {
 
@@ -43,17 +44,16 @@ void BaseDebugPointAction::handle(HttpRequest* req) {
 }
 
 Status AddDebugPointAction::_handle(HttpRequest* req) {
-    std::string debug_point = req->param("debug_point");
+    std::string name = req->param("debug_point");
     std::string execute = req->param("execute");
     std::string timeout = req->param("timeout");
-    if (debug_point.empty()) {
+    if (name.empty()) {
         return Status::InternalError("Empty debug point name");
     }
-    int64_t execute_limit = -1;
-    int64_t timeout_second = -1;
+    auto debug_point = std::make_shared<DebugPoint>();
     try {
         if (!execute.empty()) {
-            execute_limit = std::stol(execute);
+            debug_point->execute_limit = std::stol(execute);
         }
     } catch (const std::exception& e) {
         return Status::InternalError("Invalid execute limit format, execute 
{}, err {}", execute,
@@ -61,14 +61,19 @@ Status AddDebugPointAction::_handle(HttpRequest* req) {
     }
     try {
         if (!timeout.empty()) {
-            timeout_second = std::stol(timeout);
+            int64_t timeout_second = std::stol(timeout);
+            if (timeout_second > 0) {
+                debug_point->expire_ms = MonotonicMillis() + timeout_second * 
MILLIS_PER_SEC;
+            }
         }
     } catch (const std::exception& e) {
         return Status::InternalError("Invalid timeout format, timeout {}, err 
{}", timeout,
                                      e.what());
     }
 
-    DebugPoints::instance()->add(debug_point, execute_limit, timeout_second);
+    debug_point->params = *(req->params());
+
+    DebugPoints::instance()->add(name, debug_point);
 
     return Status::OK();
 }
diff --git a/be/src/util/debug_points.cpp b/be/src/util/debug_points.cpp
index 587f8c944a3..43bb39df9a4 100644
--- a/be/src/util/debug_points.cpp
+++ b/be/src/util/debug_points.cpp
@@ -30,37 +30,42 @@ DebugPoints* DebugPoints::instance() {
 }
 
 bool DebugPoints::is_enable(const std::string& name) {
+    return get_debug_point(name) != nullptr;
+}
+
+std::shared_ptr<DebugPoint> DebugPoints::get_debug_point(const std::string& 
name) {
     if (!config::enable_debug_points) {
-        return false;
+        return nullptr;
     }
     auto map_ptr = std::atomic_load_explicit(&_debug_points, 
std::memory_order_relaxed);
     auto it = map_ptr->find(name);
     if (it == map_ptr->end()) {
-        return false;
+        return nullptr;
     }
 
-    auto& debug_point = *(it->second);
-    if ((debug_point.expire_ms > 0 && MonotonicMillis() >= 
debug_point.expire_ms) ||
-        (debug_point.execute_limit > 0 &&
-         debug_point.execute_num.fetch_add(1, std::memory_order_relaxed) >=
-                 debug_point.execute_limit)) {
+    auto debug_point = it->second;
+    if ((debug_point->expire_ms > 0 && MonotonicMillis() >= 
debug_point->expire_ms) ||
+        (debug_point->execute_limit > 0 &&
+         debug_point->execute_num.fetch_add(1, std::memory_order_relaxed) >=
+                 debug_point->execute_limit)) {
         remove(name);
-        return false;
+        return nullptr;
     }
 
-    return true;
+    return debug_point;
 }
 
-void DebugPoints::add(const std::string& name, int64_t execute_limit, int64_t 
timeout_second) {
-    auto debug_point = std::make_shared<DebugPoint>();
-    debug_point->execute_limit = execute_limit;
-    if (timeout_second > 0) {
-        debug_point->expire_ms = MonotonicMillis() + timeout_second * 
MILLIS_PER_SEC;
-    }
+void DebugPoints::add(const std::string& name, std::shared_ptr<DebugPoint> 
debug_point) {
     update([&](DebugPointMap& new_points) { new_points[name] = debug_point; });
 
-    LOG(INFO) << "add debug point: name=" << name << ", execute=" << 
execute_limit
-              << ", timeout=" << timeout_second;
+    std::ostringstream oss;
+    oss << "{";
+    for (auto [key, value] : debug_point->params) {
+        oss << key << " : " << value << ", ";
+    }
+    oss << "}";
+
+    LOG(INFO) << "add debug point: name=" << name << ", params=" << oss.str();
 }
 
 void DebugPoints::remove(const std::string& name) {
diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h
index 704405689cc..1106a548f8d 100644
--- a/be/src/util/debug_points.h
+++ b/be/src/util/debug_points.h
@@ -18,19 +18,23 @@
 #pragma once
 
 #include <atomic>
+#include <boost/lexical_cast.hpp>
 #include <functional>
 #include <map>
 #include <memory>
-#include <string>
+#include <type_traits>
 
 #include "common/compiler_util.h"
 #include "common/config.h"
+#include "fmt/format.h"
 
-#define DBUG_EXECUTE_IF(debug_point, code)                     \
-    if (UNLIKELY(config::enable_debug_points)) {               \
-        if (DebugPoints::instance()->is_enable(debug_point)) { \
-            code;                                              \
-        }                                                      \
+// more usage can see 'util/debug_points_test.cpp'
+#define DBUG_EXECUTE_IF(debug_point_name, code)                               \
+    if (UNLIKELY(config::enable_debug_points)) {                              \
+        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
+        if (dp) {                                                             \
+            code;                                                             \
+        }                                                                     \
     }
 
 namespace doris {
@@ -39,15 +43,69 @@ struct DebugPoint {
     std::atomic<int64_t> execute_num {0};
     int64_t execute_limit = -1;
     int64_t expire_ms = -1;
+
+    std::map<std::string, std::string> params;
+
+    template <typename T>
+    T param(const std::string& key, T default_value = T()) {
+        auto it = params.find(key);
+        if (it == params.end()) {
+            return default_value;
+        }
+        if constexpr (std::is_same_v<T, bool>) {
+            if (it->second == "true") {
+                return true;
+            }
+            if (it->second == "false") {
+                return false;
+            }
+            return boost::lexical_cast<T>(it->second);
+        } else if constexpr (std::is_arithmetic_v<T>) {
+            return boost::lexical_cast<T>(it->second);
+        } else if constexpr (std::is_same_v<T, const char*>) {
+            return it->second.c_str();
+        } else {
+            static_assert(std::is_same_v<T, std::string>);
+            return it->second;
+        }
+    }
 };
 
 class DebugPoints {
 public:
     bool is_enable(const std::string& name);
-    void add(const std::string& name, int64_t execute_limit, int64_t 
timeout_second);
+    std::shared_ptr<DebugPoint> get_debug_point(const std::string& name);
     void remove(const std::string& name);
     void clear();
 
+    // if not enable debug point or its params not contains `key`, then return 
`default_value`
+    // url: /api/debug_point/add/name?k1=v1&k2=v2&...
+    template <typename T>
+    T get_debug_param_or_default(const std::string& name, const std::string& 
key,
+                                 const T& default_value) {
+        auto debug_point = get_debug_point(name);
+        return debug_point ? debug_point->param(key, default_value) : 
default_value;
+    }
+
+    // url: /api/debug_point/add/name?value=v
+    template <typename T>
+    T get_debug_param_or_default(const std::string& name, const T& 
default_value) {
+        return get_debug_param_or_default(name, "value", default_value);
+    }
+
+    void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point);
+
+    // more 'add' functions for convenient use
+    void add(const std::string& name) { add(name, 
std::make_shared<DebugPoint>()); }
+    void add_with_params(const std::string& name,
+                         const std::map<std::string, std::string>& params) {
+        add(name, std::shared_ptr<DebugPoint>(new DebugPoint {.params = 
params}));
+    }
+    template <typename T>
+    void add_with_value(const std::string& name, const T& value) {
+        add_with_params(name, {{"value", fmt::format("{}", value)}});
+    }
+
     static DebugPoints* instance();
 
 private:
diff --git a/be/test/util/debug_points_test.cpp 
b/be/test/util/debug_points_test.cpp
index c2cf2bdedfd..76c4fd00781 100644
--- a/be/test/util/debug_points_test.cpp
+++ b/be/test/util/debug_points_test.cpp
@@ -54,6 +54,46 @@ TEST(DebugPointsTest, BaseTest) {
     EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug4"));
     std::this_thread::sleep_for(std::chrono::milliseconds(1000));
     EXPECT_FALSE(DebugPoints::instance()->is_enable("dbug4"));
+
+    
POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false");
+    EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug5"));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param<int>("v1", 100)));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param<std::string>("v2")));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string())));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b")));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param<double>("v3")));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0)));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false)));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_FALSE(dp->param("v5", false)));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param<int64_t>("v_not_exist")));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L)));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123)));
+    DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", 
"abcd")));
+
+    EXPECT_EQ(1.2, 
DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0));
+    EXPECT_EQ(100,
+              
DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 
100));
+
+    POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567");
+    EXPECT_EQ(567, 
DebugPoints::instance()->get_debug_param_or_default("dbug6", 0));
+}
+
+TEST(DebugPointsTest, AddTest) {
+    config::enable_debug_points = true;
+    DebugPoints::instance()->clear();
+
+    DebugPoints::instance()->add("dbug1");
+    EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1"));
+
+    DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}});
+    EXPECT_EQ(100, 
DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0));
+
+    DebugPoints::instance()->add_with_value("dbug3", 567);
+    EXPECT_EQ(567, 
DebugPoints::instance()->get_debug_param_or_default("dbug3", 567));
+
+    DebugPoints::instance()->add_with_value("dbug4", "hello");
+    EXPECT_EQ("hello",
+              
DebugPoints::instance()->get_debug_param_or_default<std::string>("dbug4", ""));
 }
 
 } // namespace doris
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index e55eab89392..631f2ebaf3b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -20,6 +20,7 @@ package org.apache.doris.catalog;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.common.util.DebugPointUtil;
 import org.apache.doris.thrift.TUniqueId;
 
 import com.google.gson.annotations.SerializedName;
@@ -114,6 +115,14 @@ public class Replica implements Writable {
     private TUniqueId cooldownMetaId;
     private long cooldownTerm = -1;
 
+    // A replica version should increase monotonically,
+    // but backend may missing some versions due to disk failure or bugs.
+    // FE should found these and mark the replica as missing versions.
+    // If backend's report version < fe version, record the backend's report 
version as `regressiveVersion`,
+    // and if time exceed 5min, fe should mark this replica as missing 
versions.
+    private long regressiveVersion = -1;
+    private long regressiveVersionTimestamp = 0;
+
     /*
      * This can happen when this replica is created by a balance clone task, 
and
      * when task finished, the version of this replica is behind the 
partition's visible version.
@@ -435,9 +444,9 @@ public class Replica implements Writable {
 
         if (lastFailedVersion != this.lastFailedVersion) {
             // Case 2:
-            if (lastFailedVersion > this.lastFailedVersion) {
+            if (lastFailedVersion > this.lastFailedVersion || 
lastFailedVersion < 0) {
                 this.lastFailedVersion = lastFailedVersion;
-                this.lastFailedTimestamp = System.currentTimeMillis();
+                this.lastFailedTimestamp = lastFailedVersion > 0 ? 
System.currentTimeMillis() : -1L;
             }
 
             this.lastSuccessVersion = this.version;
@@ -506,10 +515,6 @@ public class Replica implements Writable {
         return true;
     }
 
-    public void setLastFailedVersion(long lastFailedVersion) {
-        this.lastFailedVersion = lastFailedVersion;
-    }
-
     public void setState(ReplicaState replicaState) {
         this.state = replicaState;
     }
@@ -534,6 +539,25 @@ public class Replica implements Writable {
         this.versionCount = versionCount;
     }
 
+    public boolean checkVersionRegressive(long newVersion) {
+        if (newVersion >= version) {
+            regressiveVersion = -1;
+            regressiveVersionTimestamp = -1;
+            return false;
+        }
+
+        if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) 
{
+            return true;
+        }
+
+        if (newVersion != regressiveVersion) {
+            regressiveVersion = newVersion;
+            regressiveVersionTimestamp = System.currentTimeMillis();
+        }
+
+        return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 
60 * 1000L;
+    }
+
     @Override
     public String toString() {
         StringBuilder strBuffer = new StringBuilder("[replicaId=");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 2b601f9f030..a2d5983aac4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -390,10 +390,22 @@ public class TabletInvertedIndex {
         if (backendTabletInfo.getVersion() > versionInFe) {
             // backend replica's version is larger or newer than replica in 
FE, sync it.
             return true;
-        } else if (versionInFe == backendTabletInfo.getVersion() && 
replicaInFe.isBad()) {
+        } else if (versionInFe == backendTabletInfo.getVersion()) {
             // backend replica's version is equal to replica in FE, but 
replica in FE is bad,
             // while backend replica is good, sync it
-            return true;
+            if (replicaInFe.isBad()) {
+                return true;
+            }
+
+            // FE' s replica last failed version > partition's committed 
version
+            // this can be occur when be report miss version, fe will set last 
failed version = visible version + 1
+            // then last failed version may greater than partition's committed 
version
+            //
+            // But here cannot got variable partition, we just check 
lastFailedVersion = version + 1,
+            // In ReportHandler.sync, we will check if last failed version > 
partition's committed version again.
+            if (replicaInFe.getLastFailedVersion() == versionInFe + 1) {
+                return true;
+            }
         }
 
         return false;
@@ -501,6 +513,12 @@ public class TabletInvertedIndex {
             // so we only return true if version_miss is true.
             return true;
         }
+
+        // backend versions regressive due to bugs
+        if 
(replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) {
+            return true;
+        }
+
         return false;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java 
b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
index b4667f80696..3f52210e8f1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java
@@ -1074,6 +1074,13 @@ public class TabletSchedCtx implements 
Comparable<TabletSchedCtx> {
 
             replica.updateVersionInfo(reportedTablet.getVersion(), 
reportedTablet.getDataSize(),
                     reportedTablet.getDataSize(), 
reportedTablet.getRowCount());
+            if (replica.getLastFailedVersion() > 
partition.getCommittedVersion()
+                    && reportedTablet.getVersion() >= 
partition.getCommittedVersion()
+                    //&& !(reportedTablet.isSetVersionMiss() && 
reportedTablet.isVersionMiss()
+                    && !(reportedTablet.isSetUsed() && 
!reportedTablet.isUsed())) {
+                LOG.info("change replica {} of tablet {} 's last failed 
version to -1", replica, tabletId);
+                replica.updateLastFailedVersion(-1L);
+            }
             if (reportedTablet.isSetPathHash()) {
                 replica.setPathHash(reportedTablet.getPathHash());
             }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
index aab9b8f2ba6..da06232f0c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java
@@ -19,6 +19,8 @@ package org.apache.doris.common.util;
 
 import org.apache.doris.common.Config;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -28,45 +30,114 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Use for manage debug points.
+ *
+ * usage example can see DebugPointUtilTest.java
+ *
  **/
 public class DebugPointUtil {
     private static final Logger LOG = 
LogManager.getLogger(DebugPointUtil.class);
 
     private static final Map<String, DebugPoint> debugPoints = new 
ConcurrentHashMap<>();
 
-    private static class DebugPoint {
+    public static class DebugPoint {
         public AtomicInteger executeNum = new AtomicInteger(0);
         public int executeLimit = -1;
         public long expireTime = -1;
+
+        // params
+        public Map<String, String> params = Maps.newHashMap();
+
+        public <E> E param(String key, E defaultValue) {
+            Preconditions.checkState(defaultValue != null);
+
+            String value = params.get(key);
+            if (value == null) {
+                return defaultValue;
+            }
+            if (defaultValue instanceof Boolean) {
+                return (E) Boolean.valueOf(value);
+            }
+            if (defaultValue instanceof Byte) {
+                return (E) Byte.valueOf(value);
+            }
+            if (defaultValue instanceof Character) {
+                Preconditions.checkState(value.length() == 1);
+                return (E) Character.valueOf(value.charAt(0));
+            }
+            if (defaultValue instanceof Short) {
+                return (E) Short.valueOf(value);
+            }
+            if (defaultValue instanceof Integer) {
+                return (E) Integer.valueOf(value);
+            }
+            if (defaultValue instanceof Long) {
+                return (E) Long.valueOf(value);
+            }
+            if (defaultValue instanceof Float) {
+                return (E) Float.valueOf(value);
+            }
+            if (defaultValue instanceof Double) {
+                return (E) Double.valueOf(value);
+            }
+            if (defaultValue instanceof String) {
+                return (E) value;
+            }
+
+            Preconditions.checkState(false, "Can not convert with default 
value=" + defaultValue);
+
+            return defaultValue;
+        }
     }
 
     public static boolean isEnable(String debugPointName) {
+        return getDebugPoint(debugPointName) != null;
+    }
+
+    public static DebugPoint getDebugPoint(String debugPointName) {
         if (!Config.enable_debug_points) {
-            return false;
+            return null;
         }
 
         DebugPoint debugPoint = debugPoints.get(debugPointName);
         if (debugPoint == null) {
-            return false;
+            return null;
         }
 
         if ((debugPoint.expireTime > 0 && System.currentTimeMillis() >= 
debugPoint.expireTime)
                 || (debugPoint.executeLimit > 0 && 
debugPoint.executeNum.incrementAndGet() > debugPoint.executeLimit)) {
             debugPoints.remove(debugPointName);
-            return false;
+            return null;
         }
 
-        return true;
+        return debugPoint;
     }
 
-    public static void addDebugPoint(String name, int executeLimit, long 
timeoutSecond) {
-        DebugPoint debugPoint = new DebugPoint();
-        debugPoint.executeLimit = executeLimit;
-        if (timeoutSecond > 0) {
-            debugPoint.expireTime = System.currentTimeMillis() + timeoutSecond 
* 1000;
-        }
+    // if not enable debug point or its params not contains `key`, then return 
`defaultValue`
+    // url: /api/debug_point/add/name?k1=v1&k2=v2&...
+    public static <E> E getDebugParamOrDefault(String debugPointName, String 
key, E defaultValue) {
+        DebugPoint debugPoint = getDebugPoint(debugPointName);
+
+        return debugPoint != null ? debugPoint.param(key, defaultValue) : 
defaultValue;
+    }
+
+    // url: /api/debug_point/add/name?value=v
+    public static <E> E getDebugParamOrDefault(String debugPointName, E 
defaultValue) {
+        return getDebugParamOrDefault(debugPointName, "value", defaultValue);
+    }
+
+    public static void addDebugPoint(String name, DebugPoint debugPoint) {
         debugPoints.put(name, debugPoint);
-        LOG.info("add debug point: name={}, execute={}, timeout seconds={}", 
name, executeLimit, timeoutSecond);
+        LOG.info("add debug point: name={}, params={}", name, 
debugPoint.params);
+    }
+
+    public static void addDebugPoint(String name) {
+        addDebugPoint(name, new DebugPoint());
+    }
+
+    public static <E> void addDebugPointWithValue(String name, E value) {
+        DebugPoint debugPoint = new DebugPoint();
+        debugPoint.params.put("value", String.format("%s", value));
+        addDebugPoint(name, debugPoint);
     }
 
     public static void removeDebugPoint(String name) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
index 25ee7a5d0a6..8c102fd0ae4 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java
@@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
@@ -40,7 +41,7 @@ import javax.servlet.http.HttpServletResponse;
 public class DebugPointAction extends RestBaseController {
 
     @RequestMapping(path = "/api/debug_point/add/{debugPoint}", method = 
RequestMethod.POST)
-    protected Object addDebugPoint(@PathVariable("debugPoint") String 
debugPoint,
+    protected Object addDebugPoint(@PathVariable("debugPoint") String name,
             @RequestParam(name = "execute", required = false, defaultValue = 
"") String execute,
             @RequestParam(name = "timeout", required = false, defaultValue = 
"") String timeout,
             HttpServletRequest request, HttpServletResponse response) {
@@ -50,28 +51,38 @@ public class DebugPointAction extends RestBaseController {
         }
         executeCheckPassword(request, response);
         checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), 
PrivPredicate.ADMIN);
-        if (Strings.isNullOrEmpty(debugPoint)) {
+        if (Strings.isNullOrEmpty(name)) {
             return ResponseEntityBuilder.badRequest("Empty debug point name.");
         }
-        int executeLimit = -1;
+
+        DebugPoint debugPoint = new DebugPoint();
         if (!Strings.isNullOrEmpty(execute)) {
             try {
-                executeLimit = Integer.valueOf(execute);
+                debugPoint.executeLimit = Integer.valueOf(execute);
             } catch (Exception e) {
                 return ResponseEntityBuilder.badRequest(
                         "Invalid execute format: " + execute + ", err " + 
e.getMessage());
             }
         }
-        long timeoutSeconds = -1;
         if (!Strings.isNullOrEmpty(timeout)) {
             try {
-                timeoutSeconds = Long.valueOf(timeout);
+                long timeoutSeconds = Long.valueOf(timeout);
+                if (timeoutSeconds > 0) {
+                    debugPoint.expireTime = System.currentTimeMillis() + 
timeoutSeconds * 1000;
+                }
             } catch (Exception e) {
                 return ResponseEntityBuilder.badRequest(
                         "Invalid timeout format: " + timeout + ", err " + 
e.getMessage());
             }
         }
-        DebugPointUtil.addDebugPoint(debugPoint, executeLimit, timeoutSeconds);
+        request.getParameterMap().forEach((key, values) -> {
+            if (values != null && values.length > 0) {
+                debugPoint.params.put(key, values[0]);
+            }
+        });
+
+        DebugPointUtil.addDebugPoint(name, debugPoint);
+
         return ResponseEntityBuilder.ok();
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
index 2833eff5f3d..64b771663b2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -192,7 +192,7 @@ public class MasterImpl {
                     finishRecoverTablet(task);
                     break;
                 case ALTER:
-                    finishAlterTask(task);
+                    finishAlterTask(task, request);
                     break;
                 case ALTER_INVERTED_INDEX:
                     finishAlterInvertedIndexTask(task, request);
@@ -575,7 +575,7 @@ public class MasterImpl {
         return reportHandler.handleReport(request);
     }
 
-    private void finishAlterTask(AgentTask task) {
+    private void finishAlterTask(AgentTask task, TFinishTaskRequest request) {
         AlterReplicaTask alterTask = (AlterReplicaTask) task;
         try {
             if (alterTask.getJobType() == JobType.ROLLUP) {
@@ -584,6 +584,11 @@ public class MasterImpl {
                 
Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask);
             }
             alterTask.setFinished(true);
+            if (request.isSetReportVersion()) {
+                long reportVersion = request.getReportVersion();
+                Env.getCurrentSystemInfo().updateBackendReportVersion(
+                        task.getBackendId(), reportVersion, task.getDbId(), 
task.getTableId());
+            }
         } catch (MetaNotFoundException e) {
             LOG.warn("failed to handle finish alter task: {}, {}", 
task.getSignature(), e.getMessage());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index bfe1bb0a9e2..2de0c4e5050 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -403,7 +403,8 @@ public class ReportHandler extends Daemon {
         }
     }
 
-    private static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets, long backendReportVersion) {
+    // public for fe ut
+    public static void tabletReport(long backendId, Map<Long, TTablet> 
backendTablets, long backendReportVersion) {
         long start = System.currentTimeMillis();
         LOG.info("backend[{}] reports {} tablet(s). report version: {}",
                 backendId, backendTablets.size(), backendReportVersion);
@@ -607,6 +608,11 @@ public class ReportHandler extends Daemon {
                 if (olapTable == null || !olapTable.writeLockIfExist()) {
                     continue;
                 }
+
+                if (backendReportVersion < 
Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) {
+                    break;
+                }
+
                 try {
                     long partitionId = tabletMeta.getPartitionId();
                     Partition partition = olapTable.getPartition(partitionId);
@@ -660,14 +666,25 @@ public class ReportHandler extends Daemon {
                             continue;
                         }
 
-                        if (metaVersion < backendVersion
-                                || (metaVersion == backendVersion && 
replica.isBad())) {
-
-                            if (backendReportVersion < 
Env.getCurrentSystemInfo()
-                                    .getBackendReportVersion(backendId)) {
-                                continue;
+                        boolean needSync = false;
+                        if (metaVersion < backendVersion) {
+                            needSync = true;
+                        } else if (metaVersion == backendVersion) {
+                            if (replica.isBad()) {
+                                needSync = true;
                             }
+                            if (replica.getVersion() >= 
partition.getCommittedVersion()
+                                    && replica.getLastFailedVersion() > 
partition.getCommittedVersion()) {
+                                LOG.info("sync replica {} of tablet {} in 
backend {} in db {}. replica last failed"
+                                        + " version change to -1 because last 
failed version > replica's committed"
+                                        + " version {}",
+                                        replica, tabletId, backendId, dbId, 
partition.getCommittedVersion());
+                                replica.updateLastFailedVersion(-1L);
+                                needSync = true;
+                            }
+                        }
 
+                        if (needSync) {
                             // happens when
                             // 1. PUSH finished in BE but failed or not yet 
report to FE
                             // 2. repair for VERSION_INCOMPLETE finished in 
BE, but failed or not yet report to FE
@@ -1048,18 +1065,25 @@ public class ReportHandler extends Daemon {
                                 break;
                             }
 
-                            if (tTabletInfo.isSetVersionMiss() && 
tTabletInfo.isVersionMiss()) {
+                            if ((tTabletInfo.isSetVersionMiss() && 
tTabletInfo.isVersionMiss())
+                                    || 
replica.checkVersionRegressive(tTabletInfo.getVersion())) {
                                 // If the origin last failed version is larger 
than 0, not change it.
                                 // Otherwise, we set last failed version to 
replica'version + 1.
                                 // Because last failed version should always 
larger than replica's version.
                                 long newLastFailedVersion = 
replica.getLastFailedVersion();
                                 if (newLastFailedVersion < 0) {
                                     newLastFailedVersion = 
replica.getVersion() + 1;
+                                    
replica.updateLastFailedVersion(newLastFailedVersion);
+                                    LOG.warn("set missing version for replica 
{} of tablet {} on backend {}, "
+                                            + "version in fe {}, version in be 
{}, be missing {}",
+                                            replica.getId(), tabletId, 
backendId, replica.getVersion(),
+                                            tTabletInfo.getVersion(), 
tTabletInfo.isVersionMiss());
                                 }
-                                
replica.updateLastFailedVersion(newLastFailedVersion);
                                 
backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion);
                                 break;
                             }
+
+                            break;
                         }
                     }
                 } finally {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
new file mode 100644
index 00000000000..f56a55d5874
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.clone;
+
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.MaterializedIndex;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.util.DebugPointUtil;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
+import org.apache.doris.master.ReportHandler;
+import org.apache.doris.thrift.TTablet;
+import org.apache.doris.thrift.TTabletInfo;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.Maps;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class RepairVersionTest extends TestWithFeService {
+    private class TableInfo {
+        Partition partition;
+        Tablet tablet;
+        Replica replica;
+    }
+
+    @Override
+    protected void beforeCreatingConnectContext() throws Exception {
+        Config.enable_debug_points = true;
+        Config.disable_balance = true;
+        Config.disable_tablet_scheduler = true;
+        Config.allow_replica_on_same_host = true;
+        FeConstants.tablet_checker_interval_ms = 100;
+        FeConstants.tablet_schedule_interval_ms = 100;
+    }
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+    }
+
+    @Override
+    protected int backendNum() {
+        return 2;
+    }
+
+    @Test
+    public void testRepairLastFailedVersionByClone() throws Exception {
+        TableInfo info = 
prepareTableForTest("tbl_repair_last_fail_version_by_clone");
+        Partition partition = info.partition;
+        Replica replica = info.replica;
+
+        replica.updateLastFailedVersion(replica.getVersion() + 1);
+        Assertions.assertEquals(partition.getCommittedVersion() + 1, 
replica.getLastFailedVersion());
+
+        Config.disable_tablet_scheduler = false;
+        Thread.sleep(1000);
+        Config.disable_tablet_scheduler = true;
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+    }
+
+    @Test
+    public void testRepairLastFailedVersionByReport() throws Exception {
+        TableInfo info = 
prepareTableForTest("tbl_repair_last_fail_version_by_report");
+        Partition partition = info.partition;
+        Tablet tablet = info.tablet;
+        Replica replica = info.replica;
+
+        replica.updateLastFailedVersion(replica.getVersion() + 1);
+        Assertions.assertEquals(partition.getCommittedVersion() + 1, 
replica.getLastFailedVersion());
+
+        TTabletInfo tTabletInfo = new TTabletInfo();
+        tTabletInfo.setTabletId(tablet.getId());
+        tTabletInfo.setSchemaHash(replica.getSchemaHash());
+        tTabletInfo.setVersion(replica.getVersion());
+        tTabletInfo.setPathHash(replica.getPathHash());
+        tTabletInfo.setPartitionId(partition.getId());
+        tTabletInfo.setReplicaId(replica.getId());
+
+        TTablet tTablet = new TTablet();
+        tTablet.addToTabletInfos(tTabletInfo);
+        Map<Long, TTablet> tablets = Maps.newHashMap();
+        tablets.put(tablet.getId(), tTablet);
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+    }
+
+    @Test
+    public void testVersionRegressive() throws Exception {
+        TableInfo info = prepareTableForTest("tbl_version_regressive");
+        Partition partition = info.partition;
+        Tablet tablet = info.tablet;
+        Replica replica = info.replica;
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+        Assertions.assertTrue(replica.getVersion() > 1L);
+
+        TTabletInfo tTabletInfo = new TTabletInfo();
+        tTabletInfo.setTabletId(tablet.getId());
+        tTabletInfo.setSchemaHash(replica.getSchemaHash());
+        tTabletInfo.setVersion(1L); // be report version = 1 which less than 
fe version
+        tTabletInfo.setPathHash(replica.getPathHash());
+        tTabletInfo.setPartitionId(partition.getId());
+        tTabletInfo.setReplicaId(replica.getId());
+
+        TTablet tTablet = new TTablet();
+        tTablet.addToTabletInfos(tTabletInfo);
+        Map<Long, TTablet> tablets = Maps.newHashMap();
+        tablets.put(tablet.getId(), tTablet);
+
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+
+        DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", 
new DebugPoint());
+        ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L);
+        Assertions.assertEquals(replica.getVersion() + 1, 
replica.getLastFailedVersion());
+
+        Assertions.assertEquals(partition.getVisibleVersion(), 
replica.getVersion());
+    }
+
+    private TableInfo prepareTableForTest(String tableName) throws Exception {
+        createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED 
BY HASH(k) "
+                + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )");
+
+        Database db = 
Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test");
+        OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName);
+        Assertions.assertNotNull(tbl);
+        Partition partition = tbl.getPartitions().iterator().next();
+        Tablet tablet = 
partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next()
+                .getTablets().iterator().next();
+
+        long visibleVersion = 2L;
+        partition.updateVisibleVersion(visibleVersion);
+        partition.setNextVersion(visibleVersion + 1);
+        tablet.getReplicas().forEach(replica -> 
replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L));
+
+        Replica replica = tablet.getReplicas().iterator().next();
+        Assertions.assertEquals(visibleVersion, replica.getVersion());
+        Assertions.assertEquals(-1L, replica.getLastFailedVersion());
+
+        TableInfo info = new TableInfo();
+        info.partition = partition;
+        info.tablet = tablet;
+        info.replica = replica;
+
+        return info;
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
index 2845cec9225..0a68885bf26 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.common.util;
 
 import org.apache.doris.common.Config;
+import org.apache.doris.common.util.DebugPointUtil.DebugPoint;
 import org.apache.doris.http.DorisHttpTestCase;
 
 import okhttp3.Request;
@@ -54,6 +55,23 @@ public class DebugPointUtilTest extends DorisHttpTestCase {
         Assert.assertTrue(DebugPointUtil.isEnable("dbug4"));
         Thread.sleep(1000);
         Assert.assertFalse(DebugPointUtil.isEnable("dbug4"));
+
+        
sendRequest("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false");
+        Assert.assertTrue(DebugPointUtil.isEnable("dbug5"));
+        DebugPoint debugPoint = DebugPointUtil.getDebugPoint("dbug5");
+        Assert.assertNotNull(debugPoint);
+        Assert.assertEquals(1, (int) debugPoint.param("v1", 0));
+        Assert.assertEquals("a", debugPoint.param("v2", ""));
+        Assert.assertEquals(1.2, debugPoint.param("v3", 0.0), 1e-6);
+        Assert.assertTrue(debugPoint.param("v4", false));
+        Assert.assertFalse(debugPoint.param("v5", false));
+        Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L));
+
+        Assert.assertEquals(1, (int) 
DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0));
+        Assert.assertEquals(100, (int) 
DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100));
+
+        sendRequest("/api/debug_point/add/dbug6?value=100");
+        Assert.assertEquals(100, (int) 
DebugPointUtil.getDebugParamOrDefault("dbug6", 0));
     }
 
     private void sendRequest(String uri) throws Exception {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java 
b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
index a17fcdf72bc..ac4fa2660db 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java
@@ -392,7 +392,7 @@ public abstract class TestWithFeService {
             InterruptedException {
         int feRpcPort = startFEServer(runningDir);
         List<Backend> bes = Lists.newArrayList();
-        System.out.println("start create backend");
+        System.out.println("start create backend, backend num " + backendNum);
         for (int i = 0; i < backendNum; i++) {
             bes.add(createBackend("127.0.0.1", feRpcPort));
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to