This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7630b62e72f branch-3.0: [opt](meta-service) Implement set_value API
for meta-servce #49052 (#49359)
7630b62e72f is described below
commit 7630b62e72fb00611bc41014fae9c57e997328e4
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 24 11:32:02 2025 +0800
branch-3.0: [opt](meta-service) Implement set_value API for meta-servce
#49052 (#49359)
Cherry-picked from #49052
Co-authored-by: Gavin Chou <[email protected]>
---
cloud/src/common/util.cpp | 24 ++++
cloud/src/common/util.h | 6 +
cloud/src/meta-service/http_encode_key.cpp | 208 ++++++++++++++++++++++-----
cloud/src/meta-service/meta_service_http.cpp | 16 ++-
cloud/src/meta-service/meta_service_http.h | 3 +
cloud/test/meta_service_http_test.cpp | 73 ++++++++++
6 files changed, 289 insertions(+), 41 deletions(-)
diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp
index 8d1c8fed983..50f29afb0ba 100644
--- a/cloud/src/common/util.cpp
+++ b/cloud/src/common/util.cpp
@@ -247,6 +247,30 @@ bool ValueBuf::to_pb(google::protobuf::Message* pb) const {
return pb->ParseFromZeroCopyStream(&merge_stream);
}
+std::string ValueBuf::value() const {
+ butil::IOBuf merge;
+ for (auto&& it : iters) {
+ it->reset();
+ while (it->has_next()) {
+ auto [k, v] = it->next();
+ merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
+ }
+ }
+ return merge.to_string();
+}
+
+std::vector<std::string> ValueBuf::keys() const {
+ std::vector<std::string> ret;
+ for (auto&& it : iters) {
+ it->reset();
+ while (it->has_next()) {
+ auto [k, _] = it->next();
+ ret.push_back({k.data(), k.size()});
+ }
+ }
+ return ret;
+}
+
void ValueBuf::remove(Transaction* txn) const {
for (auto&& it : iters) {
it->reset();
diff --git a/cloud/src/common/util.h b/cloud/src/common/util.h
index de37c2f4d9b..8f2e8aa077e 100644
--- a/cloud/src/common/util.h
+++ b/cloud/src/common/util.h
@@ -89,6 +89,12 @@ struct ValueBuf {
// Return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not
found, otherwise for error.
TxnErrorCode get(Transaction* txn, std::string_view key, bool snapshot =
false);
+ // return the merged value in ValueBuf
+ std::string value() const;
+
+ // return all keys in ValueBuf, if the value is not splitted, size of keys
is 1
+ std::vector<std::string> keys() const;
+
std::vector<std::unique_ptr<RangeGetIterator>> iters;
int8_t ver {-1};
};
diff --git a/cloud/src/meta-service/http_encode_key.cpp
b/cloud/src/meta-service/http_encode_key.cpp
index 4d05f0121b0..728b52df2d8 100644
--- a/cloud/src/meta-service/http_encode_key.cpp
+++ b/cloud/src/meta-service/http_encode_key.cpp
@@ -15,9 +15,12 @@
// specific language governing permissions and limitations
// under the License.
+#include <brpc/controller.h>
#include <brpc/uri.h>
#include <fmt/format.h>
#include <gen_cpp/cloud.pb.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
#include <bit>
#include <iomanip>
@@ -29,6 +32,7 @@
#include <utility>
#include <vector>
+#include "common/logging.h"
#include "common/util.h"
#include "cpp/sync_point.h"
#include "meta-service/codec.h"
@@ -80,6 +84,21 @@ struct KeyInfoSetter {
};
// clang-format on
+template <typename Message>
+static std::shared_ptr<Message> parse_json(const std::string& json) {
+ static_assert(std::is_base_of_v<google::protobuf::Message, Message>);
+ auto ret = std::make_shared<Message>();
+ auto st = google::protobuf::util::JsonStringToMessage(json, ret.get());
+ if (st.ok()) return ret;
+ std::string err = "failed to strictly parse json message, error: " +
st.ToString();
+ LOG_WARNING(err).tag("json", json);
+ // ignore unknown fields
+ // google::protobuf::util::JsonParseOptions json_parse_options;
+ // json_parse_options.ignore_unknown_fields = true;
+ // return google::protobuf::util::JsonStringToMessage(body, req,
json_parse_options);
+ return nullptr;
+}
+
using param_type = const std::vector<const std::string*>;
template <class ProtoType>
@@ -177,41 +196,45 @@ static std::string parse_tablet_stats(const ValueBuf&
buf) {
}
// See keys.h to get all types of key, e.g: MetaRowsetKeyInfo
-// key_type -> {{param1, param2 ...}, key_encoding_func, value_parsing_func}
-// where params are the input for key_encoding_func
+// key_type -> {{param_name1, param_name2 ...}, key_encoding_func,
serialized_pb_to_json_parsing_func, json_to_proto_parsing_func}
+// where param names are the input for key_encoding_func
// clang-format off
+// key_type
static std::unordered_map<std::string_view,
- std::tuple<std::vector<std::string_view>,
std::function<std::string(param_type&)>, std::function<std::string(const
ValueBuf&)>>> param_set {
- {"InstanceKey", {{"instance_id"},
[](param_type& p) { return
instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); }
, parse<InstanceInfoPB>}} ,
- {"TxnLabelKey", {{"instance_id", "db_id", "label"},
[](param_type& p) { return
txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); }
, parse_txn_label}} ,
- {"TxnInfoKey", {{"instance_id", "db_id", "txn_id"},
[](param_type& p) { return
txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); }
, parse<TxnInfoPB>}} ,
- {"TxnIndexKey", {{"instance_id", "txn_id"},
[](param_type& p) { return
txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); }
, parse<TxnIndexPB>}} ,
- {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"},
[](param_type& p) { return
txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); }
, parse<TxnRunningPB>}} ,
- {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id",
"partition_id"}, [](param_type& p) { return
partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); }
, parse<VersionPB>}} ,
- {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"},
[](param_type& p) { return
table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); }
, parse<VersionPB>}} ,
- {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"},
[](param_type& p) { return
meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); }
, parse<doris::RowsetMetaCloudPB>}} ,
- {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"},
[](param_type& p) { return
meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); }
, parse<doris::RowsetMetaCloudPB>}} ,
- {"MetaTabletKey", {{"instance_id", "table_id", "index_id",
"part_id", "tablet_id"}, [](param_type& p) { return
meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); }
, parse<doris::TabletMetaCloudPB>}} ,
- {"MetaTabletIdxKey", {{"instance_id", "tablet_id"},
[](param_type& p) { return
meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); }
, parse<TabletIndexPB>}} ,
- {"RecycleIndexKey", {{"instance_id", "index_id"},
[](param_type& p) { return
recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); }
, parse<RecycleIndexPB>}} ,
- {"RecyclePartKey", {{"instance_id", "part_id"},
[](param_type& p) { return
recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); }
, parse<RecyclePartitionPB>}} ,
- {"RecycleRowsetKey", {{"instance_id", "tablet_id", "rowset_id"},
[](param_type& p) { return
recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); }
, parse<RecycleRowsetPB>}} ,
- {"RecycleTxnKey", {{"instance_id", "db_id", "txn_id"},
[](param_type& p) { return
recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); }
, parse<RecycleTxnPB>}} ,
- {"StatsTabletKey", {{"instance_id", "table_id", "index_id",
"part_id", "tablet_id"}, [](param_type& p) { return
stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); }
, parse_tablet_stats}} ,
- {"JobTabletKey", {{"instance_id", "table_id", "index_id",
"part_id", "tablet_id"}, [](param_type& p) { return
job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); }
, parse<TabletJobInfoPB>}} ,
- {"CopyJobKey", {{"instance_id", "stage_id", "table_id",
"copy_id", "group_id"}, [](param_type& p) { return
copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); }
, parse<CopyJobPB>}} ,
- {"CopyFileKey", {{"instance_id", "stage_id", "table_id",
"obj_key", "obj_etag"}, [](param_type& p) { return
copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); }
, parse<CopyFilePB>}} ,
- {"RecycleStageKey", {{"instance_id", "stage_id"},
[](param_type& p) { return
recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); }
, parse<RecycleStagePB>}} ,
- {"JobRecycleKey", {{"instance_id"},
[](param_type& p) { return
job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); }
, parse<JobRecyclePB>}} ,
- {"MetaSchemaKey", {{"instance_id", "index_id",
"schema_version"}, [](param_type& p) { return
meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); }
, parse_tablet_schema}} ,
- {"MetaDeleteBitmap", {{"instance_id", "tablet_id", "rowest_id",
"version", "seg_id"}, [](param_type& p) { return
meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); }
, parse_delete_bitmap}} ,
- {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id",
"partition_id"}, [](param_type& p) { return
meta_delete_bitmap_update_lock_key(
KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); },
parse<DeleteBitmapUpdateLockPB>}},
- {"MetaPendingDeleteBitmap", {{"instance_id", "tablet_id"},
[](param_type& p) { return
meta_pending_delete_bitmap_key(
KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); } ,
parse<PendingDeleteBitmapPB>}} ,
- {"RLJobProgressKey", {{"instance_id", "db_id", "job_id"},
[](param_type& p) { return rl_job_progress_key_info(
KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); } ,
parse<RoutineLoadProgressPB>}} ,
- {"MetaServiceRegistryKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_registry_key(); }
, parse<ServiceRegistryPB>}} ,
- {"MetaServiceArnInfoKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_arn_info_key(); }
, parse<RamUserPB>}} ,
- {"MetaServiceEncryptionKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_encryption_key_info_key(); }
, parse<EncryptionKeyInfoPB>}} ,
- {"StorageVaultKey", {{"instance_id", "vault_id"},
[](param_type& p) { return
storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); }
, parse<StorageVaultPB>}} ,};
+ // params
key_encoding_func serialized_pb_to_json_parsing_func
json_to_proto_parsing_func
+ std::tuple<std::vector<std::string_view>,
std::function<std::string(param_type&)>, std::function<std::string(const
ValueBuf&)>, std::function<std::shared_ptr<google::protobuf::Message>(const
std::string&)>>> param_set {
+ {"InstanceKey", {{"instance_id"},
[](param_type& p) { return
instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get());
}, parse<InstanceInfoPB> , parse_json<InstanceInfoPB>}},
+ {"TxnLabelKey", {{"instance_id", "db_id", "label"},
[](param_type& p) { return
txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get());
}, parse_txn_label , parse_json<TxnLabelPB>}},
+ {"TxnInfoKey", {{"instance_id", "db_id", "txn_id"},
[](param_type& p) { return
txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get());
}, parse<TxnInfoPB> , parse_json<TxnInfoPB>}},
+ {"TxnIndexKey", {{"instance_id", "txn_id"},
[](param_type& p) { return
txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get());
}, parse<TxnIndexPB> , parse_json<TxnIndexPB>}},
+ {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"},
[](param_type& p) { return
txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get());
}, parse<TxnRunningPB> , parse_json<TxnRunningPB>}},
+ {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id",
"partition_id"}, [](param_type& p) { return
partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get());
}, parse<VersionPB> , parse_json<VersionPB>}},
+ {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"},
[](param_type& p) { return
table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get());
}, parse<VersionPB> , parse_json<VersionPB>}},
+ {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"},
[](param_type& p) { return
meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get());
}, parse<doris::RowsetMetaCloudPB> ,
parse_json<doris::RowsetMetaCloudPB>}},
+ {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"},
[](param_type& p) { return
meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get());
}, parse<doris::RowsetMetaCloudPB> ,
parse_json<doris::RowsetMetaCloudPB>}},
+ {"MetaTabletKey", {{"instance_id", "table_id", "index_id",
"part_id", "tablet_id"}, [](param_type& p) { return
meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get());
}, parse<doris::TabletMetaCloudPB> ,
parse_json<doris::TabletMetaCloudPB>}},
+ {"MetaTabletIdxKey", {{"instance_id", "tablet_id"},
[](param_type& p) { return
meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get());
}, parse<TabletIndexPB> , parse_json<TabletIndexPB>}},
+ {"RecycleIndexKey", {{"instance_id", "index_id"},
[](param_type& p) { return
recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get());
}, parse<RecycleIndexPB> , parse_json<RecycleIndexPB>}},
+ {"RecyclePartKey", {{"instance_id", "part_id"},
[](param_type& p) { return
recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get());
}, parse<RecyclePartitionPB> ,
parse_json<RecyclePartitionPB>}},
+ {"RecycleRowsetKey", {{"instance_id", "tablet_id", "rowset_id"},
[](param_type& p) { return
recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get());
}, parse<RecycleRowsetPB> , parse_json<RecycleRowsetPB>}},
+ {"RecycleTxnKey", {{"instance_id", "db_id", "txn_id"},
[](param_type& p) { return
recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get());
}, parse<RecycleTxnPB> , parse_json<RecycleTxnPB>}},
+ {"StatsTabletKey", {{"instance_id", "table_id", "index_id",
"part_id", "tablet_id"}, [](param_type& p) { return
stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get());
}, parse_tablet_stats , parse_json<TabletStatsPB>}},
+ {"JobTabletKey", {{"instance_id", "table_id", "index_id",
"part_id", "tablet_id"}, [](param_type& p) { return
job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get());
}, parse<TabletJobInfoPB> , parse_json<TabletJobInfoPB>}},
+ {"CopyJobKey", {{"instance_id", "stage_id", "table_id",
"copy_id", "group_id"}, [](param_type& p) { return
copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get());
}, parse<CopyJobPB> , parse_json<CopyJobPB>}},
+ {"CopyFileKey", {{"instance_id", "stage_id", "table_id",
"obj_key", "obj_etag"}, [](param_type& p) { return
copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get());
}, parse<CopyFilePB> , parse_json<CopyFilePB>}},
+ {"RecycleStageKey", {{"instance_id", "stage_id"},
[](param_type& p) { return
recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get());
}, parse<RecycleStagePB> , parse_json<RecycleStagePB>}},
+ {"JobRecycleKey", {{"instance_id"},
[](param_type& p) { return
job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get());
}, parse<JobRecyclePB> , parse_json<JobRecyclePB>}},
+ {"MetaSchemaKey", {{"instance_id", "index_id",
"schema_version"}, [](param_type& p) { return
meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get());
}, parse_tablet_schema ,
parse_json<doris::TabletSchemaCloudPB>}},
+ {"MetaDeleteBitmap", {{"instance_id", "tablet_id", "rowest_id",
"version", "seg_id"}, [](param_type& p) { return
meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get());
}, parse_delete_bitmap , parse_json<DeleteBitmapPB>}},
+ {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id",
"partition_id"}, [](param_type& p) { return
meta_delete_bitmap_update_lock_key(KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get());
}, parse<DeleteBitmapUpdateLockPB> , parse_json<DeleteBitmapUpdateLockPB>}},
+ {"MetaPendingDeleteBitmap", {{"instance_id", "tablet_id"},
[](param_type& p) { return
meta_pending_delete_bitmap_key(KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get());
}, parse<PendingDeleteBitmapPB> ,
parse_json<PendingDeleteBitmapPB>}},
+ {"RLJobProgressKey", {{"instance_id", "db_id", "job_id"},
[](param_type& p) { return
rl_job_progress_key_info(KeyInfoSetter<RLJobProgressKeyInfo>{p}.get());
}, parse<RoutineLoadProgressPB> ,
parse_json<RoutineLoadProgressPB>}},
+ {"StorageVaultKey", {{"instance_id", "vault_id"},
[](param_type& p) { return
storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get());
}, parse<StorageVaultPB> , parse_json<StorageVaultPB>}},
+ {"MetaSchemaPBDictionaryKey", {{"instance_id", "index_id"},
[](param_type& p) { return
meta_schema_pb_dictionary_key(KeyInfoSetter<MetaSchemaPBDictionaryInfo>{p}.get());
}, parse<SchemaCloudDictionary> ,
parse_json<SchemaCloudDictionary>}},
+ {"MetaServiceRegistryKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_registry_key();
}, parse<ServiceRegistryPB> ,
parse_json<ServiceRegistryPB>}},
+ {"MetaServiceArnInfoKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_arn_info_key();
}, parse<RamUserPB> , parse_json<RamUserPB>}},
+ {"MetaServiceEncryptionKey", {std::vector<std::string_view> {},
[](param_type& p) { return
system_meta_service_encryption_key_info_key();
}, parse<EncryptionKeyInfoPB> ,
parse_json<EncryptionKeyInfoPB>}},
+};
// clang-format on
static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string&
key) {
@@ -225,9 +248,10 @@ static MetaServiceResponseStatus encode_key(const
brpc::URI& uri, std::string& k
(key_type.empty() ? "(empty)" : key_type)));
return status;
}
+ auto& key_params = std::get<0>(it->second);
std::remove_cv_t<param_type> params;
- params.reserve(std::get<0>(it->second).size());
- for (auto& i : std::get<0>(it->second)) {
+ params.reserve(key_params.size());
+ for (auto& i : key_params) {
auto p = uri.GetQuery(i.data());
if (p == nullptr || p->empty()) {
status.set_code(MetaServiceCode::INVALID_ARGUMENT);
@@ -236,7 +260,8 @@ static MetaServiceResponseStatus encode_key(const
brpc::URI& uri, std::string& k
}
params.emplace_back(p);
}
- key = std::get<1>(it->second)(params);
+ auto& key_encoding_function = std::get<1>(it->second);
+ key = key_encoding_function(params);
return status;
}
@@ -292,7 +317,8 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const
brpc::URI& uri) {
return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
fmt::format("failed to get kv, key={}",
hex(key)));
}
- auto readable_value = std::get<2>(it->second)(value);
+ auto& value_parsing_function = std::get<2>(it->second);
+ auto readable_value = value_parsing_function(value);
if (readable_value.empty()) [[unlikely]] {
return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR,
fmt::format("failed to parse value, key={}",
hex(key)));
@@ -300,6 +326,114 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const
brpc::URI& uri) {
return http_text_reply(MetaServiceCode::OK, "", readable_value);
}
+HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* cntl) {
+ const brpc::URI& uri = cntl->http_request().uri();
+ std::string body = cntl->request_attachment().to_string();
+ LOG(INFO) << "set value, body=" << body;
+
+ std::string key;
+ if (auto hex_key = http_query(uri, "key"); !hex_key.empty()) {
+ key = unhex(hex_key);
+ } else { // Encode key from params
+ auto st = encode_key(uri, key);
+ if (st.code() != MetaServiceCode::OK) {
+ return http_json_reply(st);
+ }
+ }
+ std::unique_ptr<Transaction> txn;
+ TxnErrorCode err = txn_kv->create_txn(&txn);
+ if (err != TxnErrorCode::TXN_OK) [[unlikely]] {
+ return http_json_reply(MetaServiceCode::KV_TXN_CREATE_ERR,
+ fmt::format("failed to create txn, err={}",
err));
+ }
+
+ std::string_view key_type = http_query(uri, "key_type");
+ auto it = param_set.find(key_type);
+ if (it == param_set.end()) {
+ return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("key_type not supported: {}",
+ (key_type.empty() ? "(empty)" :
key_type)));
+ }
+ auto& json_parsing_function = std::get<3>(it->second);
+ std::shared_ptr<google::protobuf::Message> pb_to_save =
json_parsing_function(body);
+ if (pb_to_save == nullptr) {
+ LOG(WARNING) << "invalid input json value for key_type=" << key_type;
+ return http_json_reply(
+ MetaServiceCode::INVALID_ARGUMENT,
+ fmt::format("invalid input json value, cannot parse json to
pb, key_type={}",
+ key_type));
+ }
+
+ LOG(INFO) << "parsed pb to save key_type=" << key_type << " key=" <<
hex(key)
+ << " pb_to_save=" << proto_to_json(*pb_to_save);
+
+ // ATTN:
+ // StatsTabletKey is special, it has a series of keys, we only handle the
base stat key
+ // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are
splited in to multiple KV
+ ValueBuf value;
+ err = cloud::get(txn.get(), key, &value, true);
+
+ bool kv_found = true;
+ if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+ // it is possible the key-value to set is non-existed
+ kv_found = false;
+ } else if (err != TxnErrorCode::TXN_OK) {
+ return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
+ fmt::format("failed to get kv, key={}",
hex(key)));
+ }
+
+ auto concat = [](const std::vector<std::string>& keys) {
+ std::stringstream s;
+ for (auto& i : keys) s << hex(i) << ", ";
+ return s.str();
+ };
+ LOG(INFO) << "set value, key_type=" << key_type << " " <<
value.keys().size() << " keys=["
+ << concat(value.keys()) << "]";
+
+ std::string original_value_json;
+ if (kv_found) {
+ auto& serialized_value_to_json_parsing_function =
std::get<2>(it->second);
+ original_value_json = serialized_value_to_json_parsing_function(value);
+ if (original_value_json.empty()) [[unlikely]] {
+ LOG(WARNING) << "failed to parse value, key=" << hex(key)
+ << " val=" << hex(value.value());
+ return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+ fmt::format("failed to parse value,
key={}", hex(key)));
+ } else {
+ LOG(INFO) << "original_value_json=" << original_value_json;
+ }
+ }
+ std::string serialized_value_to_save = pb_to_save->SerializeAsString();
+ if (serialized_value_to_save.empty()) {
+ LOG(WARNING) << "failed to serialize, key=" << hex(key);
+ return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+ fmt::format("failed to serialize, key={}",
hex(key)));
+ }
+ // we need to remove the original KVs, it may be a range of keys
+ // and the number of final keys may be less than the initial number of keys
+ if (kv_found) value.remove(txn.get());
+
+ // TODO(gavin): use cloud::put() to deal with split-multi-kv and special
keys
+ // StatsTabletKey is special, it has a series of keys, we only handle the
base stat key
+ // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are
splited in to multiple KV
+ txn->put(key, serialized_value_to_save);
+ err = txn->commit();
+ if (err != TxnErrorCode::TXN_OK) {
+ std::stringstream ss;
+ ss << "failed to commit txn when set value, err=" << err << " key=" <<
hex(key);
+ LOG(WARNING) << ss.str();
+ return http_json_reply(MetaServiceCode::UNDEFINED_ERR, ss.str());
+ }
+ LOG(WARNING) << "set_value saved, key=" << hex(key);
+
+ std::stringstream final_json;
+ final_json << "original_value_hex=" << hex(value.value()) << "\n"
+ << "key_hex=" << hex(key) << "\n"
+ << "original_value_json=" << original_value_json << "\n";
+
+ return http_text_reply(MetaServiceCode::OK, "", final_json.str());
+}
+
HttpResponse process_http_encode_key(const brpc::URI& uri) {
std::string key;
auto st = encode_key(uri, key);
diff --git a/cloud/src/meta-service/meta_service_http.cpp
b/cloud/src/meta-service/meta_service_http.cpp
index c6eb07010c4..53d9dbbef83 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -158,7 +158,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const
std::string& msg,
return {status_code, msg, sb.GetString()};
}
-static std::string format_http_request(const brpc::HttpHeader& request) {
+static std::string format_http_request(brpc::Controller* cntl) {
+ const brpc::HttpHeader& request = cntl->http_request();
auto& unresolved_path = request.unresolved_path();
auto& uri = request.uri();
std::stringstream ss;
@@ -173,6 +174,8 @@ static std::string format_http_request(const
brpc::HttpHeader& request) {
for (auto it = request.HeaderBegin(); it != request.HeaderEnd(); ++it) {
ss << "\n" << it->first << ":" << it->second;
}
+ std::string body = cntl->request_attachment().to_string();
+ ss << "\nbody=" << (body.empty() ? "(empty)" : body);
return ss.str();
}
@@ -509,6 +512,10 @@ static HttpResponse process_get_value(MetaServiceImpl*
service, brpc::Controller
return process_http_get_value(service->txn_kv().get(),
ctrl->http_request().uri());
}
+static HttpResponse process_set_value(MetaServiceImpl* service,
brpc::Controller* ctrl) {
+ return process_http_set_value(service->txn_kv().get(), ctrl);
+}
+
// show all key ranges and their count.
static HttpResponse process_show_meta_ranges(MetaServiceImpl* service,
brpc::Controller* ctrl) {
auto txn_kv = std::dynamic_pointer_cast<FdbTxnKv>(service->txn_kv());
@@ -737,6 +744,7 @@ void
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"decode_key", process_decode_key},
{"encode_key", process_encode_key},
{"get_value", process_get_value},
+ {"set_value", process_set_value},
{"show_meta_ranges", process_show_meta_ranges},
{"txn_lazy_commit", process_txn_lazy_commit},
{"fix_tablet_stats", process_fix_tablet_stats},
@@ -744,11 +752,11 @@ void
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"v1/decode_key", process_decode_key},
{"v1/encode_key", process_encode_key},
{"v1/get_value", process_get_value},
+ {"v1/set_value", process_set_value},
{"v1/show_meta_ranges", process_show_meta_ranges},
{"v1/txn_lazy_commit", process_txn_lazy_commit},
{"v1/injection_point", process_injection_point},
- // for get
- {"get_instance", process_get_instance_info},
+ {"v1/fix_tablet_stats", process_fix_tablet_stats},
// for get
{"get_instance", process_get_instance_info},
{"get_obj_store_info", process_get_obj_store_info},
@@ -785,7 +793,7 @@ void
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
// Prepare input request info
LOG(INFO) << "rpc from " << cntl->remote_side()
<< " request: " << cntl->http_request().uri().path();
- std::string http_request = format_http_request(cntl->http_request());
+ std::string http_request = format_http_request(cntl);
// Auth
auto token = http_query(cntl->http_request().uri(), "token");
diff --git a/cloud/src/meta-service/meta_service_http.h
b/cloud/src/meta-service/meta_service_http.h
index ead53f0630f..1dca1d3d64d 100644
--- a/cloud/src/meta-service/meta_service_http.h
+++ b/cloud/src/meta-service/meta_service_http.h
@@ -17,6 +17,7 @@
#pragma once
+#include <brpc/controller.h>
#include <brpc/uri.h>
#include <gen_cpp/cloud.pb.h>
@@ -41,6 +42,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const
std::string& msg,
HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri);
+HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* ctrl);
+
HttpResponse process_http_encode_key(const brpc::URI& uri);
/// Return the query value or an empty string if not exists.
diff --git a/cloud/test/meta_service_http_test.cpp
b/cloud/test/meta_service_http_test.cpp
index 3afb2f66bf6..e51e6fa819b 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+#include "meta-service/meta_service_http.h"
+
#include <brpc/channel.h>
#include <brpc/controller.h>
#include <brpc/server.h>
@@ -1761,4 +1763,75 @@ TEST(MetaServiceHttpTest, UpdateConfig) {
}
}
+TEST(HttpEncodeKeyTest, ProcessHttpSetValue) {
+ auto txn_kv = std::make_shared<MemTxnKv>();
+ std::unique_ptr<Transaction> txn;
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+ // Create and serialize initial RowsetMeta
+ RowsetMetaCloudPB initial_rowset_meta;
+ initial_rowset_meta.set_rowset_id_v2("12345");
+ initial_rowset_meta.set_rowset_id(0);
+ initial_rowset_meta.set_tablet_id(67890);
+ initial_rowset_meta.set_num_rows(100);
+ initial_rowset_meta.set_data_disk_size(1024);
+ std::string serialized_initial = initial_rowset_meta.SerializeAsString();
+
+ // Generate proper rowset meta key
+ std::string instance_id = "test_instance";
+ int64_t tablet_id = 67890;
+ int64_t version = 10086;
+
+ // Generate proper rowset meta key
+ MetaRowsetKeyInfo key_info {instance_id, tablet_id, version};
+ std::string initial_key = meta_rowset_key(key_info);
+
+ // Store initial RowsetMeta in TxnKv
+ txn->put(initial_key, serialized_initial);
+ ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+ // Create new RowsetMeta to update
+ RowsetMetaCloudPB new_rowset_meta;
+ new_rowset_meta.set_rowset_id_v2("12345");
+ new_rowset_meta.set_rowset_id(0);
+ new_rowset_meta.set_tablet_id(67890);
+ new_rowset_meta.set_num_rows(200); // Updated row count
+ new_rowset_meta.set_data_disk_size(2048); // Updated size
+ std::string json_value = proto_to_json(new_rowset_meta);
+
+ // Initialize cntl URI with required parameters
+ brpc::URI cntl_uri;
+ cntl_uri._path = "/meta-service/http/set_value";
+ cntl_uri.SetQuery("key_type", "MetaRowsetKey");
+ cntl_uri.SetQuery("instance_id", instance_id);
+ cntl_uri.SetQuery("tablet_id", std::to_string(tablet_id));
+ cntl_uri.SetQuery("version", std::to_string(version));
+
+ brpc::Controller cntl;
+ cntl.request_attachment().append(json_value);
+ cntl.http_request().uri() = cntl_uri;
+
+ // Test update
+ auto response = process_http_set_value(txn_kv.get(), &cntl);
+ EXPECT_EQ(response.status_code, 200) << response.msg;
+ std::stringstream final_json;
+ final_json << "original_value_hex=" <<
hex(initial_rowset_meta.SerializeAsString()) << "\n"
+ << "key_hex=" << hex(initial_key) << "\n"
+ << "original_value_json=" << proto_to_json(initial_rowset_meta)
<< "\n";
+ // std::cout << "xxx " << final_json.str() << std::endl;
+ EXPECT_EQ(response.body, final_json.str());
+
+ // Verify update
+ ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+ std::string updated_value;
+ ASSERT_EQ(txn->get(initial_key, &updated_value), TxnErrorCode::TXN_OK);
+
+ RowsetMetaCloudPB updated_rowset_meta;
+ ASSERT_TRUE(updated_rowset_meta.ParseFromString(updated_value));
+ EXPECT_EQ(updated_rowset_meta.rowset_id_v2(), "12345");
+ EXPECT_EQ(updated_rowset_meta.tablet_id(), 67890);
+ EXPECT_EQ(updated_rowset_meta.num_rows(), 200);
+ EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048);
+}
+
} // namespace doris::cloud
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]