This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 920c89c3f4a [enhancement](cloud) reconnect after the RPC request to 
the meta service fails (#45668) (#46358)
920c89c3f4a is described below

commit 920c89c3f4a0f79b71647b691d8e95a69553b037
Author: Luwei <lu...@selectdb.com>
AuthorDate: Fri Jan 3 21:11:18 2025 +0800

    [enhancement](cloud) reconnect after the RPC request to the meta service 
fails (#45668) (#46358)
    
    pick #45668
    ---------
    Co-authored-by: Gavin Chou <gavineaglec...@gmail.com>
---
 be/src/cloud/cloud_meta_mgr.cpp                    | 191 ++++++++++++++-------
 be/src/cloud/config.cpp                            |   4 +
 be/src/cloud/config.h                              |   4 +
 .../main/java/org/apache/doris/common/Config.java  |   4 +
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   |  52 +++++-
 5 files changed, 191 insertions(+), 64 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index 55ffe46286c..747baea780c 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -139,7 +139,71 @@ class MetaServiceProxy {
 public:
     static Status get_client(std::shared_ptr<MetaService_Stub>* stub) {
         TEST_SYNC_POINT_RETURN_WITH_VALUE("MetaServiceProxy::get_client", 
Status::OK(), stub);
-        return get_pooled_client(stub);
+        return get_pooled_client(stub, nullptr);
+    }
+
+    static Status get_proxy(MetaServiceProxy** proxy) {
+        // The 'stub' is a useless parameter, added only to reuse the 
`get_pooled_client` function.
+        std::shared_ptr<MetaService_Stub> stub;
+        return get_pooled_client(&stub, proxy);
+    }
+
+    void set_unhealthy() {
+        std::unique_lock lock(_mutex);
+        maybe_unhealthy = true;
+    }
+
+    bool need_reconn(long now) {
+        return maybe_unhealthy && ((now - last_reconn_time_ms.front()) >
+                                   
config::meta_service_rpc_reconnect_interval_ms);
+    }
+
+    Status get(std::shared_ptr<MetaService_Stub>* stub) {
+        using namespace std::chrono;
+
+        auto now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
+        {
+            std::shared_lock lock(_mutex);
+            if (_deadline_ms >= now && !is_idle_timeout(now) && 
!need_reconn(now)) {
+                _last_access_at_ms.store(now, std::memory_order_relaxed);
+                *stub = _stub;
+                return Status::OK();
+            }
+        }
+
+        auto channel = std::make_unique<brpc::Channel>();
+        Status s = init_channel(channel.get());
+        if (!s.ok()) [[unlikely]] {
+            return s;
+        }
+
+        *stub = std::make_shared<MetaService_Stub>(channel.release(),
+                                                   
google::protobuf::Service::STUB_OWNS_CHANNEL);
+
+        long deadline = now;
+        // connection age only works without list endpoint.
+        if (!is_meta_service_endpoint_list() &&
+            config::meta_service_connection_age_base_seconds > 0) {
+            std::default_random_engine rng(static_cast<uint32_t>(now));
+            std::uniform_int_distribution<> uni(
+                    config::meta_service_connection_age_base_seconds,
+                    config::meta_service_connection_age_base_seconds * 2);
+            deadline = now + 
duration_cast<milliseconds>(seconds(uni(rng))).count();
+        } else {
+            deadline = LONG_MAX;
+        }
+
+        // Last one WIN
+        std::unique_lock lock(_mutex);
+        _last_access_at_ms.store(now, std::memory_order_relaxed);
+        _deadline_ms = deadline;
+        _stub = *stub;
+
+        last_reconn_time_ms.push(now);
+        last_reconn_time_ms.pop();
+        maybe_unhealthy = false;
+
+        return Status::OK();
     }
 
 private:
@@ -147,7 +211,17 @@ private:
         return config::meta_service_endpoint.find(',') != std::string::npos;
     }
 
-    static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub) {
+    /**
+    * This function initializes a pool of `MetaServiceProxy` objects and 
selects one using
+    * round-robin. It returns a client stub via the selected proxy.
+    *
+    * @param stub A pointer to a shared pointer of `MetaService_Stub` to be 
retrieved.
+    * @param proxy (Optional) A pointer to store the selected 
`MetaServiceProxy`.
+    *
+    * @return Status Returns `Status::OK()` on success or an error status on 
failure.
+    */
+    static Status get_pooled_client(std::shared_ptr<MetaService_Stub>* stub,
+                                    MetaServiceProxy** proxy) {
         static std::once_flag proxies_flag;
         static size_t num_proxies = 1;
         static std::atomic<size_t> index(0);
@@ -164,10 +238,16 @@ private:
         for (size_t i = 0; i + 1 < num_proxies; ++i) {
             size_t next_index = index.fetch_add(1, std::memory_order_relaxed) 
% num_proxies;
             Status s = proxies[next_index].get(stub);
+            if (proxy != nullptr) {
+                *proxy = &(proxies[next_index]);
+            }
             if (s.ok()) return Status::OK();
         }
 
         size_t next_index = index.fetch_add(1, std::memory_order_relaxed) % 
num_proxies;
+        if (proxy != nullptr) {
+            *proxy = &(proxies[next_index]);
+        }
         return proxies[next_index].get(stub);
     }
 
@@ -220,53 +300,13 @@ private:
                _last_access_at_ms.load(std::memory_order_relaxed) + 
idle_timeout_ms < now;
     }
 
-    Status get(std::shared_ptr<MetaService_Stub>* stub) {
-        using namespace std::chrono;
-
-        auto now = 
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
-        {
-            std::shared_lock lock(_mutex);
-            if (_deadline_ms >= now && !is_idle_timeout(now)) {
-                _last_access_at_ms.store(now, std::memory_order_relaxed);
-                *stub = _stub;
-                return Status::OK();
-            }
-        }
-
-        auto channel = std::make_unique<brpc::Channel>();
-        Status s = init_channel(channel.get());
-        if (!s.ok()) [[unlikely]] {
-            return s;
-        }
-
-        *stub = std::make_shared<MetaService_Stub>(channel.release(),
-                                                   
google::protobuf::Service::STUB_OWNS_CHANNEL);
-
-        long deadline = now;
-        // connection age only works without list endpoint.
-        if (!is_meta_service_endpoint_list() &&
-            config::meta_service_connection_age_base_seconds > 0) {
-            std::default_random_engine rng(static_cast<uint32_t>(now));
-            std::uniform_int_distribution<> uni(
-                    config::meta_service_connection_age_base_seconds,
-                    config::meta_service_connection_age_base_seconds * 2);
-            deadline = now + 
duration_cast<milliseconds>(seconds(uni(rng))).count();
-        } else {
-            deadline = LONG_MAX;
-        }
-
-        // Last one WIN
-        std::unique_lock lock(_mutex);
-        _last_access_at_ms.store(now, std::memory_order_relaxed);
-        _deadline_ms = deadline;
-        _stub = *stub;
-        return Status::OK();
-    }
-
     std::shared_mutex _mutex;
     std::atomic<long> _last_access_at_ms {0};
     long _deadline_ms {0};
     std::shared_ptr<MetaService_Stub> _stub;
+
+    std::queue<long> last_reconn_time_ms {std::deque<long> {0, 0, 0}};
+    bool maybe_unhealthy = false;
 };
 
 template <typename T, typename... Ts>
@@ -323,9 +363,11 @@ Status retry_rpc(std::string_view op_name, const Request& 
req, Response* res,
     std::default_random_engine rng = make_random_engine();
     std::uniform_int_distribution<uint32_t> u(20, 200);
     std::uniform_int_distribution<uint32_t> u2(500, 1000);
-    std::shared_ptr<MetaService_Stub> stub;
-    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
+    MetaServiceProxy* proxy;
+    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
     while (true) {
+        std::shared_ptr<MetaService_Stub> stub;
+        RETURN_IF_ERROR(proxy->get(&stub));
         brpc::Controller cntl;
         cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
         cntl.set_max_retry(kBrpcRetryTimes);
@@ -333,6 +375,7 @@ Status retry_rpc(std::string_view op_name, const Request& 
req, Response* res,
         (stub.get()->*method)(&cntl, &req, res, nullptr);
         if (cntl.Failed()) [[unlikely]] {
             error_msg = cntl.ErrorText();
+            proxy->set_unhealthy();
         } else if (res->status().code() == MetaServiceCode::OK) {
             return Status::OK();
         } else if (res->status().code() == MetaServiceCode::INVALID_ARGUMENT) {
@@ -388,11 +431,12 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* 
tablet, bool warmup_delta_
 
     TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets", 
Status::OK(), tablet);
 
-    std::shared_ptr<MetaService_Stub> stub;
-    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
-
+    MetaServiceProxy* proxy;
+    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
     int tried = 0;
     while (true) {
+        std::shared_ptr<MetaService_Stub> stub;
+        RETURN_IF_ERROR(proxy->get(&stub));
         brpc::Controller cntl;
         cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms);
         GetRowsetRequest req;
@@ -430,6 +474,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* 
tablet, bool warmup_delta_
         _get_rowset_latency << latency;
         int retry_times = config::meta_service_rpc_retry_times;
         if (cntl.Failed()) {
+            proxy->set_unhealthy();
             if (tried++ < retry_times) {
                 auto rng = make_random_engine();
                 std::uniform_int_distribution<uint32_t> u(20, 200);
@@ -633,15 +678,10 @@ Status 
CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
         *delete_bitmap = *new_delete_bitmap;
     }
 
-    std::shared_ptr<MetaService_Stub> stub;
-    RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));
-
     int64_t new_max_version = std::max(old_max_version, 
rs_metas.rbegin()->end_version());
-    brpc::Controller cntl;
     // When there are many delete bitmaps that need to be synchronized, it
     // may take a longer time, especially when loading the tablet for the
     // first time, so set a relatively long timeout time.
-    cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
     GetDeleteBitmapRequest req;
     GetDeleteBitmapResponse res;
     req.set_cloud_unique_id(config::cloud_unique_id);
@@ -669,10 +709,43 @@ Status 
CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
     }
 
     VLOG_DEBUG << "send GetDeleteBitmapRequest: " << req.ShortDebugString();
-    stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
-    if (cntl.Failed()) {
-        return Status::RpcError("failed to get delete bitmap: {}", 
cntl.ErrorText());
+
+    int retry_times = 0;
+    MetaServiceProxy* proxy;
+    RETURN_IF_ERROR(MetaServiceProxy::get_proxy(&proxy));
+    auto start = std::chrono::high_resolution_clock::now();
+    while (true) {
+        std::shared_ptr<MetaService_Stub> stub;
+        RETURN_IF_ERROR(proxy->get(&stub));
+        // When there are many delete bitmaps that need to be synchronized, it
+        // may take a longer time, especially when loading the tablet for the
+        // first time, so set a relatively long timeout time.
+        brpc::Controller cntl;
+        cntl.set_timeout_ms(3 * config::meta_service_brpc_timeout_ms);
+        cntl.set_max_retry(kBrpcRetryTimes);
+        res.Clear();
+        stub->get_delete_bitmap(&cntl, &req, &res, nullptr);
+        if (cntl.Failed()) [[unlikely]] {
+            LOG_INFO("failed to get delete bitmap")
+                    .tag("reason", cntl.ErrorText())
+                    .tag("tablet_id", tablet->tablet_id())
+                    .tag("partition_id", tablet->partition_id())
+                    .tag("tried", retry_times);
+            proxy->set_unhealthy();
+        } else {
+            break;
+        }
+
+        if (++retry_times > config::delete_bitmap_rpc_retry_times) {
+            if (cntl.Failed()) {
+                return Status::RpcError("failed to get delete bitmap, 
tablet={} err={}",
+                                        tablet->tablet_id(), cntl.ErrorText());
+            }
+            break;
+        }
     }
+    auto end = std::chrono::high_resolution_clock::now();
+
     if (res.status().code() == MetaServiceCode::TABLET_NOT_FOUND) {
         return Status::NotFound("failed to get delete bitmap: {}", 
res.status().msg());
     }
@@ -722,7 +795,7 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* 
tablet, int64_t old_
                 {rst_id, segment_ids[i], vers[i]},
                 roaring::Roaring::readSafe(delete_bitmaps[i].data(), 
delete_bitmaps[i].length()));
     }
-    int64_t latency = cntl.latency_us();
+    int64_t latency = 
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
     if (latency > 100 * 1000) { // 100ms
         LOG(INFO) << "finish get_delete_bitmap rpc. rowset_ids.size()=" << 
rowset_ids.size()
                   << ", delete_bitmaps.size()=" << delete_bitmaps.size() << ", 
latency=" << latency
diff --git a/be/src/cloud/config.cpp b/be/src/cloud/config.cpp
index feb81d0a074..1b568741442 100644
--- a/be/src/cloud/config.cpp
+++ b/be/src/cloud/config.cpp
@@ -77,5 +77,9 @@ DEFINE_mInt32(tablet_txn_info_min_expired_seconds, "120");
 DEFINE_mBool(enable_use_cloud_unique_id_from_fe, "true");
 
 DEFINE_mBool(enable_cloud_tablet_report, "false");
+
+DEFINE_mInt32(delete_bitmap_rpc_retry_times, "25");
+
+DEFINE_mInt64(meta_service_rpc_reconnect_interval_ms, "5000");
 #include "common/compile_check_end.h"
 } // namespace doris::config
diff --git a/be/src/cloud/config.h b/be/src/cloud/config.h
index a8a7c0c48ec..50f058bf8b0 100644
--- a/be/src/cloud/config.h
+++ b/be/src/cloud/config.h
@@ -111,5 +111,9 @@ DECLARE_mBool(enable_use_cloud_unique_id_from_fe);
 
 DECLARE_Bool(enable_cloud_tablet_report);
 
+DECLARE_mInt32(delete_bitmap_rpc_retry_times);
+
+DECLARE_mInt64(meta_service_rpc_reconnect_interval_ms);
+
 #include "common/compile_check_end.h"
 } // namespace doris::config
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 2f61f8242eb..a9a86601002 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3248,4 +3248,8 @@ public class Config extends ConfigBase {
             "For disabling certain SQL queries, the configuration item is a 
list of simple class names of AST"
                     + "(for example CreateRepositoryStmt, 
CreatePolicyCommand), separated by commas."})
     public static String block_sql_ast_names = "";
+
+    public static long meta_service_rpc_reconnect_interval_ms = 5000;
+
+    public static long meta_service_rpc_retry_cnt = 10;
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index d7f718e3ca4..2a69132e44b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -27,7 +27,10 @@ import io.grpc.StatusRuntimeException;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
@@ -41,6 +44,7 @@ public class MetaServiceProxy {
     // use concurrent map to allow access serviceMap in multi thread.
     private ReentrantLock lock = new ReentrantLock();
     private final Map<String, MetaServiceClient> serviceMap;
+    private Queue<Long> lastConnTimeMs = new LinkedList<>();
 
     static {
         if (Config.isCloudMode() && (Config.meta_service_endpoint == null || 
Config.meta_service_endpoint.isEmpty())) {
@@ -50,6 +54,9 @@ public class MetaServiceProxy {
 
     public MetaServiceProxy() {
         this.serviceMap = Maps.newConcurrentMap();
+        for (int i = 0; i < 3; ++i) {
+            lastConnTimeMs.add(0L);
+        }
     }
 
     private static class SingletonHolder {
@@ -77,6 +84,16 @@ public class MetaServiceProxy {
         return MetaServiceProxy.SingletonHolder.get();
     }
 
+    public boolean needReconn() {
+        lock.lock();
+        try {
+            long now = System.currentTimeMillis();
+            return (now - lastConnTimeMs.element() > 
Config.meta_service_rpc_reconnect_interval_ms);
+        } finally {
+            lock.unlock();
+        }
+    }
+
     public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest 
request)
             throws RpcException {
         try {
@@ -138,6 +155,8 @@ public class MetaServiceProxy {
             if (service == null) {
                 service = new MetaServiceClient(address);
                 serviceMap.put(address, service);
+                lastConnTimeMs.add(System.currentTimeMillis());
+                lastConnTimeMs.remove();
             }
             return service;
         } finally {
@@ -150,6 +169,7 @@ public class MetaServiceProxy {
 
     public static class MetaServiceClientWrapper {
         private final MetaServiceProxy proxy;
+        private Random random = new Random();
 
         public MetaServiceClientWrapper(MetaServiceProxy proxy) {
             this.proxy = proxy;
@@ -157,18 +177,40 @@ public class MetaServiceProxy {
 
         public <Response> Response executeRequest(Function<MetaServiceClient, 
Response> function) throws RpcException {
             int tried = 0;
-            while (tried++ < 3) {
+            while (tried++ < Config.meta_service_rpc_retry_cnt) {
+                MetaServiceClient client = null;
                 try {
-                    MetaServiceClient client = proxy.getProxy();
+                    client = proxy.getProxy();
                     return function.apply(client);
                 } catch (StatusRuntimeException sre) {
-                    if (sre.getStatus().getCode() == Status.Code.UNAVAILABLE 
|| tried == 3) {
+                    LOG.info("failed to request meta servive code {}, msg {}, 
trycnt {}", sre.getStatus().getCode(),
+                            sre.getMessage(), tried);
+                    if (tried >= Config.meta_service_rpc_retry_cnt
+                            || (sre.getStatus().getCode() != 
Status.Code.UNAVAILABLE
+                                && sre.getStatus().getCode() != 
Status.Code.UNKNOWN)) {
                         throw new RpcException("", sre.getMessage(), sre);
                     }
                 } catch (Exception e) {
-                    throw new RpcException("", e.getMessage(), e);
+                    LOG.info("failed to request meta servive trycnt {}", 
tried, e);
+                    if (tried >= Config.meta_service_rpc_retry_cnt) {
+                        throw new RpcException("", e.getMessage(), e);
+                    }
                 } catch (Throwable t) {
-                    throw new RpcException("", t.getMessage());
+                    LOG.info("failed to request meta servive trycnt {}", 
tried, t);
+                    if (tried >= Config.meta_service_rpc_retry_cnt) {
+                        throw new RpcException("", t.getMessage());
+                    }
+                }
+
+                if (proxy.needReconn() && client != null) {
+                    client.shutdown(true);
+                }
+
+                int delay = 20 + random.nextInt(200 - 20 + 1);
+                try {
+                    Thread.sleep(delay);
+                } catch (InterruptedException interruptedException) {
+                    // ignore
                 }
             }
             return null; // impossible and unreachable, just make the compiler 
happy


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to