This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 6fe9d5d1af9 [improvement](sync version) fe sync version with be (#25718) 6fe9d5d1af9 is described below commit 6fe9d5d1af9b9d5475a04925aea5f9e365108255 Author: yujun <yu.jun.re...@gmail.com> AuthorDate: Sun Oct 22 00:47:56 2023 +0800 [improvement](sync version) fe sync version with be (#25718) --- be/src/http/action/debug_point_action.cpp | 19 ++- be/src/util/debug_points.cpp | 39 +++-- be/src/util/debug_points.h | 72 ++++++++- be/test/util/debug_points_test.cpp | 40 +++++ .../java/org/apache/doris/catalog/Replica.java | 36 ++++- .../apache/doris/catalog/TabletInvertedIndex.java | 22 ++- .../org/apache/doris/clone/TabletSchedCtx.java | 7 + .../apache/doris/common/util/DebugPointUtil.java | 95 +++++++++-- .../apache/doris/httpv2/rest/DebugPointAction.java | 25 ++- .../java/org/apache/doris/master/MasterImpl.java | 9 +- .../org/apache/doris/master/ReportHandler.java | 42 +++-- .../org/apache/doris/clone/RepairVersionTest.java | 177 +++++++++++++++++++++ .../doris/common/util/DebugPointUtilTest.java | 18 +++ .../apache/doris/utframe/TestWithFeService.java | 2 +- 14 files changed, 533 insertions(+), 70 deletions(-) diff --git a/be/src/http/action/debug_point_action.cpp b/be/src/http/action/debug_point_action.cpp index 08b1e116b2b..04aa38efaa4 100644 --- a/be/src/http/action/debug_point_action.cpp +++ b/be/src/http/action/debug_point_action.cpp @@ -21,6 +21,7 @@ #include "http/http_channel.h" #include "http/http_status.h" #include "util/debug_points.h" +#include "util/time.h" namespace doris { @@ -43,17 +44,16 @@ void BaseDebugPointAction::handle(HttpRequest* req) { } Status AddDebugPointAction::_handle(HttpRequest* req) { - std::string debug_point = req->param("debug_point"); + std::string name = req->param("debug_point"); std::string execute = req->param("execute"); std::string timeout = req->param("timeout"); - if (debug_point.empty()) { + if (name.empty()) { return Status::InternalError("Empty debug point name"); } - int64_t execute_limit = -1; - int64_t timeout_second = -1; + auto debug_point = std::make_shared<DebugPoint>(); try { if (!execute.empty()) { - execute_limit = std::stol(execute); + debug_point->execute_limit = std::stol(execute); } } catch (const std::exception& e) { return Status::InternalError("Invalid execute limit format, execute {}, err {}", execute, @@ -61,14 +61,19 @@ Status AddDebugPointAction::_handle(HttpRequest* req) { } try { if (!timeout.empty()) { - timeout_second = std::stol(timeout); + int64_t timeout_second = std::stol(timeout); + if (timeout_second > 0) { + debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC; + } } } catch (const std::exception& e) { return Status::InternalError("Invalid timeout format, timeout {}, err {}", timeout, e.what()); } - DebugPoints::instance()->add(debug_point, execute_limit, timeout_second); + debug_point->params = *(req->params()); + + DebugPoints::instance()->add(name, debug_point); return Status::OK(); } diff --git a/be/src/util/debug_points.cpp b/be/src/util/debug_points.cpp index 587f8c944a3..43bb39df9a4 100644 --- a/be/src/util/debug_points.cpp +++ b/be/src/util/debug_points.cpp @@ -30,37 +30,42 @@ DebugPoints* DebugPoints::instance() { } bool DebugPoints::is_enable(const std::string& name) { + return get_debug_point(name) != nullptr; +} + +std::shared_ptr<DebugPoint> DebugPoints::get_debug_point(const std::string& name) { if (!config::enable_debug_points) { - return false; + return nullptr; } auto map_ptr = std::atomic_load_explicit(&_debug_points, std::memory_order_relaxed); auto it = map_ptr->find(name); if (it == map_ptr->end()) { - return false; + return nullptr; } - auto& debug_point = *(it->second); - if ((debug_point.expire_ms > 0 && MonotonicMillis() >= debug_point.expire_ms) || - (debug_point.execute_limit > 0 && - debug_point.execute_num.fetch_add(1, std::memory_order_relaxed) >= - debug_point.execute_limit)) { + auto debug_point = it->second; + if ((debug_point->expire_ms > 0 && MonotonicMillis() >= debug_point->expire_ms) || + (debug_point->execute_limit > 0 && + debug_point->execute_num.fetch_add(1, std::memory_order_relaxed) >= + debug_point->execute_limit)) { remove(name); - return false; + return nullptr; } - return true; + return debug_point; } -void DebugPoints::add(const std::string& name, int64_t execute_limit, int64_t timeout_second) { - auto debug_point = std::make_shared<DebugPoint>(); - debug_point->execute_limit = execute_limit; - if (timeout_second > 0) { - debug_point->expire_ms = MonotonicMillis() + timeout_second * MILLIS_PER_SEC; - } +void DebugPoints::add(const std::string& name, std::shared_ptr<DebugPoint> debug_point) { update([&](DebugPointMap& new_points) { new_points[name] = debug_point; }); - LOG(INFO) << "add debug point: name=" << name << ", execute=" << execute_limit - << ", timeout=" << timeout_second; + std::ostringstream oss; + oss << "{"; + for (auto [key, value] : debug_point->params) { + oss << key << " : " << value << ", "; + } + oss << "}"; + + LOG(INFO) << "add debug point: name=" << name << ", params=" << oss.str(); } void DebugPoints::remove(const std::string& name) { diff --git a/be/src/util/debug_points.h b/be/src/util/debug_points.h index 704405689cc..1106a548f8d 100644 --- a/be/src/util/debug_points.h +++ b/be/src/util/debug_points.h @@ -18,19 +18,23 @@ #pragma once #include <atomic> +#include <boost/lexical_cast.hpp> #include <functional> #include <map> #include <memory> -#include <string> +#include <type_traits> #include "common/compiler_util.h" #include "common/config.h" +#include "fmt/format.h" -#define DBUG_EXECUTE_IF(debug_point, code) \ - if (UNLIKELY(config::enable_debug_points)) { \ - if (DebugPoints::instance()->is_enable(debug_point)) { \ - code; \ - } \ +// more usage can see 'util/debug_points_test.cpp' +#define DBUG_EXECUTE_IF(debug_point_name, code) \ + if (UNLIKELY(config::enable_debug_points)) { \ + auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \ + if (dp) { \ + code; \ + } \ } namespace doris { @@ -39,15 +43,69 @@ struct DebugPoint { std::atomic<int64_t> execute_num {0}; int64_t execute_limit = -1; int64_t expire_ms = -1; + + std::map<std::string, std::string> params; + + template <typename T> + T param(const std::string& key, T default_value = T()) { + auto it = params.find(key); + if (it == params.end()) { + return default_value; + } + if constexpr (std::is_same_v<T, bool>) { + if (it->second == "true") { + return true; + } + if (it->second == "false") { + return false; + } + return boost::lexical_cast<T>(it->second); + } else if constexpr (std::is_arithmetic_v<T>) { + return boost::lexical_cast<T>(it->second); + } else if constexpr (std::is_same_v<T, const char*>) { + return it->second.c_str(); + } else { + static_assert(std::is_same_v<T, std::string>); + return it->second; + } + } }; class DebugPoints { public: bool is_enable(const std::string& name); - void add(const std::string& name, int64_t execute_limit, int64_t timeout_second); + std::shared_ptr<DebugPoint> get_debug_point(const std::string& name); void remove(const std::string& name); void clear(); + // if not enable debug point or its params not contains `key`, then return `default_value` + // url: /api/debug_point/add/name?k1=v1&k2=v2&... + template <typename T> + T get_debug_param_or_default(const std::string& name, const std::string& key, + const T& default_value) { + auto debug_point = get_debug_point(name); + return debug_point ? debug_point->param(key, default_value) : default_value; + } + + // url: /api/debug_point/add/name?value=v + template <typename T> + T get_debug_param_or_default(const std::string& name, const T& default_value) { + return get_debug_param_or_default(name, "value", default_value); + } + + void add(const std::string& name, std::shared_ptr<DebugPoint> debug_point); + + // more 'add' functions for convenient use + void add(const std::string& name) { add(name, std::make_shared<DebugPoint>()); } + void add_with_params(const std::string& name, + const std::map<std::string, std::string>& params) { + add(name, std::shared_ptr<DebugPoint>(new DebugPoint {.params = params})); + } + template <typename T> + void add_with_value(const std::string& name, const T& value) { + add_with_params(name, {{"value", fmt::format("{}", value)}}); + } + static DebugPoints* instance(); private: diff --git a/be/test/util/debug_points_test.cpp b/be/test/util/debug_points_test.cpp index c2cf2bdedfd..76c4fd00781 100644 --- a/be/test/util/debug_points_test.cpp +++ b/be/test/util/debug_points_test.cpp @@ -54,6 +54,46 @@ TEST(DebugPointsTest, BaseTest) { EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug4")); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); EXPECT_FALSE(DebugPoints::instance()->is_enable("dbug4")); + + POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false"); + EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug5")); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1, dp->param<int>("v1", 100))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param<std::string>("v2"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ("a", dp->param("v2", std::string()))); + DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("a", dp->param("v2", "b"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param<double>("v3"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(1.2, dp->param("v3", 0.0))); + DBUG_EXECUTE_IF("dbug5", EXPECT_TRUE(dp->param("v4", false))); + DBUG_EXECUTE_IF("dbug5", EXPECT_FALSE(dp->param("v5", false))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param<int64_t>("v_not_exist"))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(0L, dp->param("v_not_exist", 0L))); + DBUG_EXECUTE_IF("dbug5", EXPECT_EQ(123, dp->param("v_not_exist", 123))); + DBUG_EXECUTE_IF("dbug5", EXPECT_STREQ("abcd", dp->param("v_not_exist", "abcd"))); + + EXPECT_EQ(1.2, DebugPoints::instance()->get_debug_param_or_default("dbug5", "v3", 0.0)); + EXPECT_EQ(100, + DebugPoints::instance()->get_debug_param_or_default("point_not_exists", "k", 100)); + + POST_HTTP_TO_TEST_SERVER("/api/debug_point/add/dbug6?value=567"); + EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug6", 0)); +} + +TEST(DebugPointsTest, AddTest) { + config::enable_debug_points = true; + DebugPoints::instance()->clear(); + + DebugPoints::instance()->add("dbug1"); + EXPECT_TRUE(DebugPoints::instance()->is_enable("dbug1")); + + DebugPoints::instance()->add_with_params("dbug2", {{"k1", "100"}}); + EXPECT_EQ(100, DebugPoints::instance()->get_debug_param_or_default("dbug2", "k1", 0)); + + DebugPoints::instance()->add_with_value("dbug3", 567); + EXPECT_EQ(567, DebugPoints::instance()->get_debug_param_or_default("dbug3", 567)); + + DebugPoints::instance()->add_with_value("dbug4", "hello"); + EXPECT_EQ("hello", + DebugPoints::instance()->get_debug_param_or_default<std::string>("dbug4", "")); } } // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java index e55eab89392..631f2ebaf3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java @@ -20,6 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.common.Config; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.thrift.TUniqueId; import com.google.gson.annotations.SerializedName; @@ -114,6 +115,14 @@ public class Replica implements Writable { private TUniqueId cooldownMetaId; private long cooldownTerm = -1; + // A replica version should increase monotonically, + // but backend may missing some versions due to disk failure or bugs. + // FE should found these and mark the replica as missing versions. + // If backend's report version < fe version, record the backend's report version as `regressiveVersion`, + // and if time exceed 5min, fe should mark this replica as missing versions. + private long regressiveVersion = -1; + private long regressiveVersionTimestamp = 0; + /* * This can happen when this replica is created by a balance clone task, and * when task finished, the version of this replica is behind the partition's visible version. @@ -435,9 +444,9 @@ public class Replica implements Writable { if (lastFailedVersion != this.lastFailedVersion) { // Case 2: - if (lastFailedVersion > this.lastFailedVersion) { + if (lastFailedVersion > this.lastFailedVersion || lastFailedVersion < 0) { this.lastFailedVersion = lastFailedVersion; - this.lastFailedTimestamp = System.currentTimeMillis(); + this.lastFailedTimestamp = lastFailedVersion > 0 ? System.currentTimeMillis() : -1L; } this.lastSuccessVersion = this.version; @@ -506,10 +515,6 @@ public class Replica implements Writable { return true; } - public void setLastFailedVersion(long lastFailedVersion) { - this.lastFailedVersion = lastFailedVersion; - } - public void setState(ReplicaState replicaState) { this.state = replicaState; } @@ -534,6 +539,25 @@ public class Replica implements Writable { this.versionCount = versionCount; } + public boolean checkVersionRegressive(long newVersion) { + if (newVersion >= version) { + regressiveVersion = -1; + regressiveVersionTimestamp = -1; + return false; + } + + if (DebugPointUtil.isEnable("Replica.regressive_version_immediately")) { + return true; + } + + if (newVersion != regressiveVersion) { + regressiveVersion = newVersion; + regressiveVersionTimestamp = System.currentTimeMillis(); + } + + return System.currentTimeMillis() - regressiveVersionTimestamp >= 5 * 60 * 1000L; + } + @Override public String toString() { StringBuilder strBuffer = new StringBuilder("[replicaId="); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 2b601f9f030..a2d5983aac4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -390,10 +390,22 @@ public class TabletInvertedIndex { if (backendTabletInfo.getVersion() > versionInFe) { // backend replica's version is larger or newer than replica in FE, sync it. return true; - } else if (versionInFe == backendTabletInfo.getVersion() && replicaInFe.isBad()) { + } else if (versionInFe == backendTabletInfo.getVersion()) { // backend replica's version is equal to replica in FE, but replica in FE is bad, // while backend replica is good, sync it - return true; + if (replicaInFe.isBad()) { + return true; + } + + // FE' s replica last failed version > partition's committed version + // this can be occur when be report miss version, fe will set last failed version = visible version + 1 + // then last failed version may greater than partition's committed version + // + // But here cannot got variable partition, we just check lastFailedVersion = version + 1, + // In ReportHandler.sync, we will check if last failed version > partition's committed version again. + if (replicaInFe.getLastFailedVersion() == versionInFe + 1) { + return true; + } } return false; @@ -501,6 +513,12 @@ public class TabletInvertedIndex { // so we only return true if version_miss is true. return true; } + + // backend versions regressive due to bugs + if (replicaInFe.checkVersionRegressive(backendTabletInfo.getVersion())) { + return true; + } + return false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index b4667f80696..3f52210e8f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -1074,6 +1074,13 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> { replica.updateVersionInfo(reportedTablet.getVersion(), reportedTablet.getDataSize(), reportedTablet.getDataSize(), reportedTablet.getRowCount()); + if (replica.getLastFailedVersion() > partition.getCommittedVersion() + && reportedTablet.getVersion() >= partition.getCommittedVersion() + //&& !(reportedTablet.isSetVersionMiss() && reportedTablet.isVersionMiss() + && !(reportedTablet.isSetUsed() && !reportedTablet.isUsed())) { + LOG.info("change replica {} of tablet {} 's last failed version to -1", replica, tabletId); + replica.updateLastFailedVersion(-1L); + } if (reportedTablet.isSetPathHash()) { replica.setPathHash(reportedTablet.getPathHash()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java index aab9b8f2ba6..da06232f0c0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DebugPointUtil.java @@ -19,6 +19,8 @@ package org.apache.doris.common.util; import org.apache.doris.common.Config; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -28,45 +30,114 @@ import java.util.concurrent.atomic.AtomicInteger; /** * Use for manage debug points. + * + * usage example can see DebugPointUtilTest.java + * **/ public class DebugPointUtil { private static final Logger LOG = LogManager.getLogger(DebugPointUtil.class); private static final Map<String, DebugPoint> debugPoints = new ConcurrentHashMap<>(); - private static class DebugPoint { + public static class DebugPoint { public AtomicInteger executeNum = new AtomicInteger(0); public int executeLimit = -1; public long expireTime = -1; + + // params + public Map<String, String> params = Maps.newHashMap(); + + public <E> E param(String key, E defaultValue) { + Preconditions.checkState(defaultValue != null); + + String value = params.get(key); + if (value == null) { + return defaultValue; + } + if (defaultValue instanceof Boolean) { + return (E) Boolean.valueOf(value); + } + if (defaultValue instanceof Byte) { + return (E) Byte.valueOf(value); + } + if (defaultValue instanceof Character) { + Preconditions.checkState(value.length() == 1); + return (E) Character.valueOf(value.charAt(0)); + } + if (defaultValue instanceof Short) { + return (E) Short.valueOf(value); + } + if (defaultValue instanceof Integer) { + return (E) Integer.valueOf(value); + } + if (defaultValue instanceof Long) { + return (E) Long.valueOf(value); + } + if (defaultValue instanceof Float) { + return (E) Float.valueOf(value); + } + if (defaultValue instanceof Double) { + return (E) Double.valueOf(value); + } + if (defaultValue instanceof String) { + return (E) value; + } + + Preconditions.checkState(false, "Can not convert with default value=" + defaultValue); + + return defaultValue; + } } public static boolean isEnable(String debugPointName) { + return getDebugPoint(debugPointName) != null; + } + + public static DebugPoint getDebugPoint(String debugPointName) { if (!Config.enable_debug_points) { - return false; + return null; } DebugPoint debugPoint = debugPoints.get(debugPointName); if (debugPoint == null) { - return false; + return null; } if ((debugPoint.expireTime > 0 && System.currentTimeMillis() >= debugPoint.expireTime) || (debugPoint.executeLimit > 0 && debugPoint.executeNum.incrementAndGet() > debugPoint.executeLimit)) { debugPoints.remove(debugPointName); - return false; + return null; } - return true; + return debugPoint; } - public static void addDebugPoint(String name, int executeLimit, long timeoutSecond) { - DebugPoint debugPoint = new DebugPoint(); - debugPoint.executeLimit = executeLimit; - if (timeoutSecond > 0) { - debugPoint.expireTime = System.currentTimeMillis() + timeoutSecond * 1000; - } + // if not enable debug point or its params not contains `key`, then return `defaultValue` + // url: /api/debug_point/add/name?k1=v1&k2=v2&... + public static <E> E getDebugParamOrDefault(String debugPointName, String key, E defaultValue) { + DebugPoint debugPoint = getDebugPoint(debugPointName); + + return debugPoint != null ? debugPoint.param(key, defaultValue) : defaultValue; + } + + // url: /api/debug_point/add/name?value=v + public static <E> E getDebugParamOrDefault(String debugPointName, E defaultValue) { + return getDebugParamOrDefault(debugPointName, "value", defaultValue); + } + + public static void addDebugPoint(String name, DebugPoint debugPoint) { debugPoints.put(name, debugPoint); - LOG.info("add debug point: name={}, execute={}, timeout seconds={}", name, executeLimit, timeoutSecond); + LOG.info("add debug point: name={}, params={}", name, debugPoint.params); + } + + public static void addDebugPoint(String name) { + addDebugPoint(name, new DebugPoint()); + } + + public static <E> void addDebugPointWithValue(String name, E value) { + DebugPoint debugPoint = new DebugPoint(); + debugPoint.params.put("value", String.format("%s", value)); + addDebugPoint(name, debugPoint); } public static void removeDebugPoint(String name) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java index 25ee7a5d0a6..8c102fd0ae4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/DebugPointAction.java @@ -19,6 +19,7 @@ package org.apache.doris.httpv2.rest; import org.apache.doris.common.Config; import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -40,7 +41,7 @@ import javax.servlet.http.HttpServletResponse; public class DebugPointAction extends RestBaseController { @RequestMapping(path = "/api/debug_point/add/{debugPoint}", method = RequestMethod.POST) - protected Object addDebugPoint(@PathVariable("debugPoint") String debugPoint, + protected Object addDebugPoint(@PathVariable("debugPoint") String name, @RequestParam(name = "execute", required = false, defaultValue = "") String execute, @RequestParam(name = "timeout", required = false, defaultValue = "") String timeout, HttpServletRequest request, HttpServletResponse response) { @@ -50,28 +51,38 @@ public class DebugPointAction extends RestBaseController { } executeCheckPassword(request, response); checkGlobalAuth(ConnectContext.get().getCurrentUserIdentity(), PrivPredicate.ADMIN); - if (Strings.isNullOrEmpty(debugPoint)) { + if (Strings.isNullOrEmpty(name)) { return ResponseEntityBuilder.badRequest("Empty debug point name."); } - int executeLimit = -1; + + DebugPoint debugPoint = new DebugPoint(); if (!Strings.isNullOrEmpty(execute)) { try { - executeLimit = Integer.valueOf(execute); + debugPoint.executeLimit = Integer.valueOf(execute); } catch (Exception e) { return ResponseEntityBuilder.badRequest( "Invalid execute format: " + execute + ", err " + e.getMessage()); } } - long timeoutSeconds = -1; if (!Strings.isNullOrEmpty(timeout)) { try { - timeoutSeconds = Long.valueOf(timeout); + long timeoutSeconds = Long.valueOf(timeout); + if (timeoutSeconds > 0) { + debugPoint.expireTime = System.currentTimeMillis() + timeoutSeconds * 1000; + } } catch (Exception e) { return ResponseEntityBuilder.badRequest( "Invalid timeout format: " + timeout + ", err " + e.getMessage()); } } - DebugPointUtil.addDebugPoint(debugPoint, executeLimit, timeoutSeconds); + request.getParameterMap().forEach((key, values) -> { + if (values != null && values.length > 0) { + debugPoint.params.put(key, values[0]); + } + }); + + DebugPointUtil.addDebugPoint(name, debugPoint); + return ResponseEntityBuilder.ok(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java index 2833eff5f3d..64b771663b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MasterImpl.java @@ -192,7 +192,7 @@ public class MasterImpl { finishRecoverTablet(task); break; case ALTER: - finishAlterTask(task); + finishAlterTask(task, request); break; case ALTER_INVERTED_INDEX: finishAlterInvertedIndexTask(task, request); @@ -575,7 +575,7 @@ public class MasterImpl { return reportHandler.handleReport(request); } - private void finishAlterTask(AgentTask task) { + private void finishAlterTask(AgentTask task, TFinishTaskRequest request) { AlterReplicaTask alterTask = (AlterReplicaTask) task; try { if (alterTask.getJobType() == JobType.ROLLUP) { @@ -584,6 +584,11 @@ public class MasterImpl { Env.getCurrentEnv().getSchemaChangeHandler().handleFinishAlterTask(alterTask); } alterTask.setFinished(true); + if (request.isSetReportVersion()) { + long reportVersion = request.getReportVersion(); + Env.getCurrentSystemInfo().updateBackendReportVersion( + task.getBackendId(), reportVersion, task.getDbId(), task.getTableId()); + } } catch (MetaNotFoundException e) { LOG.warn("failed to handle finish alter task: {}, {}", task.getSignature(), e.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index bfe1bb0a9e2..2de0c4e5050 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -403,7 +403,8 @@ public class ReportHandler extends Daemon { } } - private static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) { + // public for fe ut + public static void tabletReport(long backendId, Map<Long, TTablet> backendTablets, long backendReportVersion) { long start = System.currentTimeMillis(); LOG.info("backend[{}] reports {} tablet(s). report version: {}", backendId, backendTablets.size(), backendReportVersion); @@ -607,6 +608,11 @@ public class ReportHandler extends Daemon { if (olapTable == null || !olapTable.writeLockIfExist()) { continue; } + + if (backendReportVersion < Env.getCurrentSystemInfo().getBackendReportVersion(backendId)) { + break; + } + try { long partitionId = tabletMeta.getPartitionId(); Partition partition = olapTable.getPartition(partitionId); @@ -660,14 +666,25 @@ public class ReportHandler extends Daemon { continue; } - if (metaVersion < backendVersion - || (metaVersion == backendVersion && replica.isBad())) { - - if (backendReportVersion < Env.getCurrentSystemInfo() - .getBackendReportVersion(backendId)) { - continue; + boolean needSync = false; + if (metaVersion < backendVersion) { + needSync = true; + } else if (metaVersion == backendVersion) { + if (replica.isBad()) { + needSync = true; } + if (replica.getVersion() >= partition.getCommittedVersion() + && replica.getLastFailedVersion() > partition.getCommittedVersion()) { + LOG.info("sync replica {} of tablet {} in backend {} in db {}. replica last failed" + + " version change to -1 because last failed version > replica's committed" + + " version {}", + replica, tabletId, backendId, dbId, partition.getCommittedVersion()); + replica.updateLastFailedVersion(-1L); + needSync = true; + } + } + if (needSync) { // happens when // 1. PUSH finished in BE but failed or not yet report to FE // 2. repair for VERSION_INCOMPLETE finished in BE, but failed or not yet report to FE @@ -1048,18 +1065,25 @@ public class ReportHandler extends Daemon { break; } - if (tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) { + if ((tTabletInfo.isSetVersionMiss() && tTabletInfo.isVersionMiss()) + || replica.checkVersionRegressive(tTabletInfo.getVersion())) { // If the origin last failed version is larger than 0, not change it. // Otherwise, we set last failed version to replica'version + 1. // Because last failed version should always larger than replica's version. long newLastFailedVersion = replica.getLastFailedVersion(); if (newLastFailedVersion < 0) { newLastFailedVersion = replica.getVersion() + 1; + replica.updateLastFailedVersion(newLastFailedVersion); + LOG.warn("set missing version for replica {} of tablet {} on backend {}, " + + "version in fe {}, version in be {}, be missing {}", + replica.getId(), tabletId, backendId, replica.getVersion(), + tTabletInfo.getVersion(), tTabletInfo.isVersionMiss()); } - replica.updateLastFailedVersion(newLastFailedVersion); backendReplicasInfo.addMissingVersionReplica(tabletId, newLastFailedVersion); break; } + + break; } } } finally { diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java new file mode 100644 index 00000000000..f56a55d5874 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/RepairVersionTest.java @@ -0,0 +1,177 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.clone; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.Tablet; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.DebugPointUtil; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; +import org.apache.doris.master.ReportHandler; +import org.apache.doris.thrift.TTablet; +import org.apache.doris.thrift.TTabletInfo; +import org.apache.doris.utframe.TestWithFeService; + +import com.google.common.collect.Maps; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Map; + +public class RepairVersionTest extends TestWithFeService { + private class TableInfo { + Partition partition; + Tablet tablet; + Replica replica; + } + + @Override + protected void beforeCreatingConnectContext() throws Exception { + Config.enable_debug_points = true; + Config.disable_balance = true; + Config.disable_tablet_scheduler = true; + Config.allow_replica_on_same_host = true; + FeConstants.tablet_checker_interval_ms = 100; + FeConstants.tablet_schedule_interval_ms = 100; + } + + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + } + + @Override + protected int backendNum() { + return 2; + } + + @Test + public void testRepairLastFailedVersionByClone() throws Exception { + TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_clone"); + Partition partition = info.partition; + Replica replica = info.replica; + + replica.updateLastFailedVersion(replica.getVersion() + 1); + Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); + + Config.disable_tablet_scheduler = false; + Thread.sleep(1000); + Config.disable_tablet_scheduler = true; + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + } + + @Test + public void testRepairLastFailedVersionByReport() throws Exception { + TableInfo info = prepareTableForTest("tbl_repair_last_fail_version_by_report"); + Partition partition = info.partition; + Tablet tablet = info.tablet; + Replica replica = info.replica; + + replica.updateLastFailedVersion(replica.getVersion() + 1); + Assertions.assertEquals(partition.getCommittedVersion() + 1, replica.getLastFailedVersion()); + + TTabletInfo tTabletInfo = new TTabletInfo(); + tTabletInfo.setTabletId(tablet.getId()); + tTabletInfo.setSchemaHash(replica.getSchemaHash()); + tTabletInfo.setVersion(replica.getVersion()); + tTabletInfo.setPathHash(replica.getPathHash()); + tTabletInfo.setPartitionId(partition.getId()); + tTabletInfo.setReplicaId(replica.getId()); + + TTablet tTablet = new TTablet(); + tTablet.addToTabletInfos(tTabletInfo); + Map<Long, TTablet> tablets = Maps.newHashMap(); + tablets.put(tablet.getId(), tTablet); + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + + ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + } + + @Test + public void testVersionRegressive() throws Exception { + TableInfo info = prepareTableForTest("tbl_version_regressive"); + Partition partition = info.partition; + Tablet tablet = info.tablet; + Replica replica = info.replica; + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + Assertions.assertTrue(replica.getVersion() > 1L); + + TTabletInfo tTabletInfo = new TTabletInfo(); + tTabletInfo.setTabletId(tablet.getId()); + tTabletInfo.setSchemaHash(replica.getSchemaHash()); + tTabletInfo.setVersion(1L); // be report version = 1 which less than fe version + tTabletInfo.setPathHash(replica.getPathHash()); + tTabletInfo.setPartitionId(partition.getId()); + tTabletInfo.setReplicaId(replica.getId()); + + TTablet tTablet = new TTablet(); + tTablet.addToTabletInfos(tTabletInfo); + Map<Long, TTablet> tablets = Maps.newHashMap(); + tablets.put(tablet.getId(), tTablet); + + ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + + DebugPointUtil.addDebugPoint("Replica.regressive_version_immediately", new DebugPoint()); + ReportHandler.tabletReport(replica.getBackendId(), tablets, 100L); + Assertions.assertEquals(replica.getVersion() + 1, replica.getLastFailedVersion()); + + Assertions.assertEquals(partition.getVisibleVersion(), replica.getVersion()); + } + + private TableInfo prepareTableForTest(String tableName) throws Exception { + createTable("CREATE TABLE test." + tableName + " (k INT) DISTRIBUTED BY HASH(k) " + + " BUCKETS 1 PROPERTIES ( \"replication_num\" = \"2\" )"); + + Database db = Env.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:test"); + OlapTable tbl = (OlapTable) db.getTableOrMetaException(tableName); + Assertions.assertNotNull(tbl); + Partition partition = tbl.getPartitions().iterator().next(); + Tablet tablet = partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL).iterator().next() + .getTablets().iterator().next(); + + long visibleVersion = 2L; + partition.updateVisibleVersion(visibleVersion); + partition.setNextVersion(visibleVersion + 1); + tablet.getReplicas().forEach(replica -> replica.updateVersionInfo(visibleVersion, 1L, 1L, 1L)); + + Replica replica = tablet.getReplicas().iterator().next(); + Assertions.assertEquals(visibleVersion, replica.getVersion()); + Assertions.assertEquals(-1L, replica.getLastFailedVersion()); + + TableInfo info = new TableInfo(); + info.partition = partition; + info.tablet = tablet; + info.replica = replica; + + return info; + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java index 2845cec9225..0a68885bf26 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/DebugPointUtilTest.java @@ -18,6 +18,7 @@ package org.apache.doris.common.util; import org.apache.doris.common.Config; +import org.apache.doris.common.util.DebugPointUtil.DebugPoint; import org.apache.doris.http.DorisHttpTestCase; import okhttp3.Request; @@ -54,6 +55,23 @@ public class DebugPointUtilTest extends DorisHttpTestCase { Assert.assertTrue(DebugPointUtil.isEnable("dbug4")); Thread.sleep(1000); Assert.assertFalse(DebugPointUtil.isEnable("dbug4")); + + sendRequest("/api/debug_point/add/dbug5?v1=1&v2=a&v3=1.2&v4=true&v5=false"); + Assert.assertTrue(DebugPointUtil.isEnable("dbug5")); + DebugPoint debugPoint = DebugPointUtil.getDebugPoint("dbug5"); + Assert.assertNotNull(debugPoint); + Assert.assertEquals(1, (int) debugPoint.param("v1", 0)); + Assert.assertEquals("a", debugPoint.param("v2", "")); + Assert.assertEquals(1.2, debugPoint.param("v3", 0.0), 1e-6); + Assert.assertTrue(debugPoint.param("v4", false)); + Assert.assertFalse(debugPoint.param("v5", false)); + Assert.assertEquals(123L, (long) debugPoint.param("v_no_exist", 123L)); + + Assert.assertEquals(1, (int) DebugPointUtil.getDebugParamOrDefault("dbug5", "v1", 0)); + Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("point_not_exists", "v1", 100)); + + sendRequest("/api/debug_point/add/dbug6?value=100"); + Assert.assertEquals(100, (int) DebugPointUtil.getDebugParamOrDefault("dbug6", 0)); } private void sendRequest(String uri) throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index a17fcdf72bc..ac4fa2660db 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -392,7 +392,7 @@ public abstract class TestWithFeService { InterruptedException { int feRpcPort = startFEServer(runningDir); List<Backend> bes = Lists.newArrayList(); - System.out.println("start create backend"); + System.out.println("start create backend, backend num " + backendNum); for (int i = 0; i < backendNum; i++) { bes.add(createBackend("127.0.0.1", feRpcPort)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org