This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ae0915721b9 [fix](cloud) Check instance_id valid when use 
cloud_unique_id degrade format #43253 (#43832)
ae0915721b9 is described below

commit ae0915721b9987897d97c6b6c56faa5363d427fb
Author: deardeng <deng...@selectdb.com>
AuthorDate: Wed Nov 13 13:48:42 2024 +0800

    [fix](cloud) Check instance_id valid when use cloud_unique_id degrade 
format #43253 (#43832)
    
    cherry pick from #43253
---
 cloud/src/common/config.h                        |  2 +
 cloud/src/meta-service/meta_service.cpp          | 56 ++++++++++++++----------
 cloud/src/meta-service/meta_service_resource.cpp | 18 ++++++--
 cloud/src/resource-manager/resource_manager.cpp  | 34 +++++++++++++-
 cloud/src/resource-manager/resource_manager.h    | 19 ++++++++
 cloud/test/fdb_injection_test.cpp                |  1 +
 cloud/test/meta_service_test.cpp                 | 16 +++++++
 7 files changed, 117 insertions(+), 29 deletions(-)

diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h
index bd255b013e3..9fe98c16510 100644
--- a/cloud/src/common/config.h
+++ b/cloud/src/common/config.h
@@ -220,4 +220,6 @@ CONF_Int32(txn_lazy_commit_num_threads, "8");
 CONF_Int32(txn_lazy_max_rowsets_per_batch, "1000");
 // max TabletIndexPB num for batch get
 CONF_Int32(max_tablet_index_num_per_batch, "1000");
+
+CONF_Bool(enable_check_instance_id, "true");
 } // namespace doris::cloud::config
diff --git a/cloud/src/meta-service/meta_service.cpp 
b/cloud/src/meta-service/meta_service.cpp
index a59869196e3..69740dbf49a 100644
--- a/cloud/src/meta-service/meta_service.cpp
+++ b/cloud/src/meta-service/meta_service.cpp
@@ -88,46 +88,54 @@ std::string get_instance_id(const 
std::shared_ptr<ResourceManager>& rc_mgr,
     std::vector<NodeInfo> nodes;
     std::string err = rc_mgr->get_node(cloud_unique_id, &nodes);
     { TEST_SYNC_POINT_CALLBACK("get_instance_id_err", &err); }
+    std::string instance_id;
     if (!err.empty()) {
         // cache can't find cloud_unique_id, so degraded by parse 
cloud_unique_id
         // cloud_unique_id encode: ${version}:${instance_id}:${unique_id}
         // check it split by ':' c
-        auto vec = split(cloud_unique_id, ':');
-        std::stringstream ss;
-        for (int i = 0; i < vec.size(); ++i) {
-            ss << "idx " << i << "= [" << vec[i] << "] ";
-        }
-        LOG(INFO) << "degraded to get instance_id, cloud_unique_id: " << 
cloud_unique_id
-                  << "after split: " << ss.str();
-        if (vec.size() != 3) {
-            LOG(WARNING) << "cloud unique id is not degraded format, failed to 
check instance "
-                            "info, cloud_unique_id="
-                         << cloud_unique_id << " , err=" << err;
+        auto [valid, id] = 
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
+        if (!valid) {
+            LOG(WARNING) << "use degraded format cloud_unique_id, but 
cloud_unique_id not degrade "
+                            "format, cloud_unique_id="
+                         << cloud_unique_id;
             return "";
         }
-        // version: vec[0], instance_id: vec[1], unique_id: vec[2]
-        switch (std::atoi(vec[0].c_str())) {
-        case 1:
-            // just return instance id;
-            return vec[1];
-        default:
-            LOG(WARNING) << "cloud unique id degraded state, but version not 
eq configure, "
+
+        // check instance_id valid by get fdb
+        if (config::enable_check_instance_id && 
!rc_mgr->is_instance_id_registered(id)) {
+            LOG(WARNING) << "use degraded format cloud_unique_id, but check 
instance failed, "
                             "cloud_unique_id="
-                         << cloud_unique_id << ", err=" << err;
+                         << cloud_unique_id;
             return "";
         }
+        return id;
     }
 
-    std::string instance_id;
-    for (auto& i : nodes) {
-        if (!instance_id.empty() && instance_id != i.instance_id) {
+    for (auto& node : nodes) {
+        if (!instance_id.empty() && instance_id != node.instance_id) {
             LOG(WARNING) << "cloud_unique_id is one-to-many instance_id, "
                          << " cloud_unique_id=" << cloud_unique_id
                          << " current_instance_id=" << instance_id
-                         << " later_instance_id=" << i.instance_id;
+                         << " later_instance_id=" << node.instance_id;
+        }
+        instance_id = node.instance_id; // The last wins
+        // check cache unique_id
+        std::string cloud_unique_id_in_cache = 
node.node_info.cloud_unique_id();
+        auto [valid, id] =
+                
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id_in_cache);
+        if (!valid) {
+            continue;
+        }
+
+        if (id != node.instance_id || id != instance_id) {
+            LOG(WARNING) << "in cache, node=" << node.node_info.DebugString()
+                         << ", cloud_unique_id=" << cloud_unique_id
+                         << " current_instance_id=" << instance_id
+                         << ", later_instance_id=" << node.instance_id;
+            continue;
         }
-        instance_id = i.instance_id; // The last wins
     }
+
     return instance_id;
 }
 
diff --git a/cloud/src/meta-service/meta_service_resource.cpp 
b/cloud/src/meta-service/meta_service_resource.cpp
index 5d4a4d69227..92020005c3a 100644
--- a/cloud/src/meta-service/meta_service_resource.cpp
+++ b/cloud/src/meta-service/meta_service_resource.cpp
@@ -1943,6 +1943,16 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
     std::string cloud_unique_id = request->has_cloud_unique_id() ? 
request->cloud_unique_id() : "";
     instance_id = request->has_instance_id() ? request->instance_id() : "";
     if (!cloud_unique_id.empty() && instance_id.empty()) {
+        auto [is_degraded_format, id] =
+                
ResourceManager::get_instance_id_by_cloud_unique_id(cloud_unique_id);
+        if (config::enable_check_instance_id && is_degraded_format &&
+            !resource_mgr_->is_instance_id_registered(id)) {
+            msg = "use degrade cloud_unique_id, but instance_id invalid, 
cloud_unique_id=" +
+                  cloud_unique_id;
+            LOG(WARNING) << msg;
+            code = MetaServiceCode::INVALID_ARGUMENT;
+            return;
+        }
         instance_id = get_instance_id(resource_mgr_, cloud_unique_id);
         if (instance_id.empty()) {
             code = MetaServiceCode::INVALID_ARGUMENT;
@@ -1992,7 +2002,7 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
     case AlterClusterRequest::ADD_NODE: {
         resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, 
false);
         if (msg != "") {
-            LOG(INFO) << msg;
+            LOG(WARNING) << msg;
             break;
         }
         std::vector<NodeInfo> to_add;
@@ -2016,7 +2026,7 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
     case AlterClusterRequest::DROP_NODE: {
         resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, 
false);
         if (msg != "") {
-            LOG(INFO) << msg;
+            LOG(WARNING) << msg;
             break;
         }
         std::vector<NodeInfo> to_add;
@@ -2039,7 +2049,7 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
     case AlterClusterRequest::DECOMMISSION_NODE: {
         resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, 
false);
         if (msg != "") {
-            LOG(INFO) << msg;
+            LOG(WARNING) << msg;
             break;
         }
 
@@ -2101,7 +2111,7 @@ void 
MetaServiceImpl::alter_cluster(google::protobuf::RpcController* controller,
     case AlterClusterRequest::NOTIFY_DECOMMISSIONED: {
         resource_mgr_->check_cluster_params_valid(request->cluster(), &msg, 
false);
         if (msg != "") {
-            LOG(INFO) << msg;
+            LOG(WARNING) << msg;
             break;
         }
 
diff --git a/cloud/src/resource-manager/resource_manager.cpp 
b/cloud/src/resource-manager/resource_manager.cpp
index 43f0a7368d8..9c37d781765 100644
--- a/cloud/src/resource-manager/resource_manager.cpp
+++ b/cloud/src/resource-manager/resource_manager.cpp
@@ -23,6 +23,7 @@
 #include <sstream>
 
 #include "common/logging.h"
+#include "common/string_util.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/keys.h"
@@ -159,6 +160,16 @@ bool ResourceManager::check_cluster_params_valid(const 
ClusterPB& cluster, std::
     int master_num = 0;
     int follower_num = 0;
     for (auto& n : cluster.nodes()) {
+        // check here cloud_unique_id
+        std::string cloud_unique_id = n.cloud_unique_id();
+        auto [is_degrade_format, instance_id] = 
get_instance_id_by_cloud_unique_id(cloud_unique_id);
+        if (config::enable_check_instance_id && is_degrade_format &&
+            !is_instance_id_registered(instance_id)) {
+            ss << "node=" << n.DebugString()
+               << " cloud_unique_id use degrade format, but check instance 
failed";
+            *err = ss.str();
+            return false;
+        }
         if (ClusterPB::SQL == cluster.type() && n.has_edit_log_port() && 
n.edit_log_port() &&
             n.has_node_type() &&
             (n.node_type() == NodeInfoPB_NodeType_FE_MASTER ||
@@ -199,6 +210,27 @@ bool ResourceManager::check_cluster_params_valid(const 
ClusterPB& cluster, std::
     return no_err;
 }
 
+std::pair<bool, std::string> 
ResourceManager::get_instance_id_by_cloud_unique_id(
+        const std::string& cloud_unique_id) {
+    auto v = split(cloud_unique_id, ':');
+    if (v.size() != 3) return {false, ""};
+    // degraded format check it
+    int version = std::atoi(v[0].c_str());
+    if (version != 1) return {false, ""};
+    return {true, v[1]};
+}
+
+bool ResourceManager::is_instance_id_registered(const std::string& 
instance_id) {
+    // check kv
+    auto [c0, m0] = get_instance(nullptr, instance_id, nullptr);
+    { TEST_SYNC_POINT_CALLBACK("is_instance_id_registered", &c0); }
+    if (c0 != TxnErrorCode::TXN_OK) {
+        LOG(WARNING) << "failed to check instance instance_id=" << instance_id
+                     << ", code=" << format_as(c0) << ", info=" + m0;
+    }
+    return c0 == TxnErrorCode::TXN_OK;
+}
+
 std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const 
std::string& instance_id,
                                                                      const 
ClusterInfo& cluster) {
     std::string msg;
@@ -624,7 +656,7 @@ std::pair<TxnErrorCode, std::string> 
ResourceManager::get_instance(std::shared_p
         return ec;
     }
 
-    if (!inst_pb->ParseFromString(val)) {
+    if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
         code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
         msg = "failed to parse InstanceInfoPB";
         return ec;
diff --git a/cloud/src/resource-manager/resource_manager.h 
b/cloud/src/resource-manager/resource_manager.h
index 5000764dee8..9e6f4548d24 100644
--- a/cloud/src/resource-manager/resource_manager.h
+++ b/cloud/src/resource-manager/resource_manager.h
@@ -114,6 +114,25 @@ public:
     bool check_cluster_params_valid(const ClusterPB& cluster, std::string* err,
                                     bool check_master_num);
 
+    /**
+     * Check cloud_unique_id is degraded format, and get instance_id from 
cloud_unique_id
+     * degraded format : "${version}:${instance_id}:${unique_id}"
+     * @param degraded cloud_unique_id
+     *
+     * @return a <is_degraded_format, instance_id> pair, if is_degraded_format 
== true , instance_id, if is_degraded_format == false, instance_id=""
+     */
+    static std::pair<bool, std::string> get_instance_id_by_cloud_unique_id(
+            const std::string& cloud_unique_id);
+
+    /**
+     * check instance_id is a valid instance, check by get fdb kv 
+     *
+     * @param instance_id
+     *
+     * @return true, instance_id in fdb kv
+     */
+    bool is_instance_id_registered(const std::string& instance_id);
+
     /**
      * Refreshes the cache of given instance. This process removes the 
instance in cache
      * and then replaces it with persisted instance state read from underlying 
KV storage.
diff --git a/cloud/test/fdb_injection_test.cpp 
b/cloud/test/fdb_injection_test.cpp
index 125ae2b6b04..08ba3e50e52 100644
--- a/cloud/test/fdb_injection_test.cpp
+++ b/cloud/test/fdb_injection_test.cpp
@@ -70,6 +70,7 @@ int main(int argc, char** argv) {
     cloud::config::txn_store_retry_base_intervals_ms = 1;
     cloud::config::fdb_cluster_file_path = "fdb.cluster";
     cloud::config::write_schema_kv = true;
+    cloud::config::enable_check_instance_id = false;
 
     auto sp = SyncPoint::get_instance();
     sp->enable_processing();
diff --git a/cloud/test/meta_service_test.cpp b/cloud/test/meta_service_test.cpp
index e6b9e9dddee..c67b49aac3f 100644
--- a/cloud/test/meta_service_test.cpp
+++ b/cloud/test/meta_service_test.cpp
@@ -56,6 +56,7 @@ int main(int argc, char** argv) {
     config::enable_txn_store_retry = true;
     config::txn_store_retry_base_intervals_ms = 1;
     config::txn_store_retry_times = 20;
+    config::enable_check_instance_id = false;
 
     if (!doris::cloud::init_glog("meta_service_test")) {
         std::cerr << "failed to init glog" << std::endl;
@@ -264,6 +265,21 @@ TEST(MetaServiceTest, GetInstanceIdTest) {
                                   "12345678901:ALBJLH4Q:m-n3qdpyal27rh8iprxx");
     ASSERT_EQ(instance_id, "");
 
+    config::enable_check_instance_id = true;
+    auto ms = get_meta_service(false);
+    instance_id =
+            get_instance_id(ms->resource_mgr(), 
"1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
+    ASSERT_EQ(instance_id, "");
+
+    sp->set_call_back("is_instance_id_registered", [&](auto&& args) {
+        TxnErrorCode* c0 = try_any_cast<TxnErrorCode*>(args[0]);
+        *c0 = TxnErrorCode::TXN_OK;
+    });
+    instance_id =
+            get_instance_id(ms->resource_mgr(), 
"1:ALBJLH4Q-check-invalid:m-n3qdpyal27rh8iprxx");
+    ASSERT_EQ(instance_id, "ALBJLH4Q-check-invalid");
+    config::enable_check_instance_id = false;
+
     sp->clear_all_call_backs();
     sp->clear_trace();
     sp->disable_processing();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to