This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 04d83f4c274 [opt](meta-service) Implement set_value API for meta-servce (#49052) 04d83f4c274 is described below commit 04d83f4c274bf9d6cd35d5f8292412b862df65d5 Author: Gavin Chou <ga...@selectdb.com> AuthorDate: Sat Mar 22 14:10:38 2025 +0800 [opt](meta-service) Implement set_value API for meta-servce (#49052) Implement set_value API for meta-service, this API is for maintenance purpose, it is dangerous. This API accepts the factors of key and the json represented value to be set, and it returns the key encoded and original value (json), if exists. This operation will be logged. usage and example, check param_set in http_encode_key.cpp for more types of keys usage ``` curl ${meta_service_ip}:${ms_port}/MetaService/http/set_value?instance_id=${instance_id}&token=${token}&$key_type=${key_type}&{factors_for_key}" -d "${json_value}" ``` example ``` $ curl "127.1:6000/MetaService/http/set_value?key_type=MetaRowsetKey&instance_id=gavin_debug_instance_doris&tablet_id=1741892112744&version=3&token=greedisgood9999" -d '{"rowset_id":"0","partition_id":"1741892112739","tablet_id":"1741892112744","txn_id":"46080","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"540","data_disk_size":"540","index_disk_size":"0","empty": [...] ' original_value_hex=080010e3a2f186d93218e8a2f186d9322080e8022800300138014003480350005801609c04689c04700088010092011508838bc181cb8a8ad15e10c986c4c8bfbfe0e59301980100a0019ed4ccbe06b00100ba0130303230303030303030303030303030313033346133323464363838306632346265333366333930663866363139343961c2010131d0019ed4ccbe06da01160a090280000001026161611209028000000102616161e801dec4cdbe06980302a00300a80300b20300a0069c04a806e5a2f186d932b00600b80601c00600d00600 key_hex=01106d657461000110676176696e5f64656275675f696e7374616e63655f646f726973000110726f777365740001120000019590dc5168120000000000000003 original_value_json={"rowset_id":"0","partition_id":"1741892112739","tablet_id":"1741892112744","txn_id":"46080","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"540","data_disk_size":"540","index_disk_size":"0","empty":false,"load_id":{"hi":"6819057129990669699","lo":"-7796995410646465719"},"delete_flag":false,"creation_time":"1741892126","num_segments":"0","rowset_ [...] ``` --- cloud/src/common/util.cpp | 24 ++++ cloud/src/common/util.h | 6 + cloud/src/meta-service/http_encode_key.cpp | 208 ++++++++++++++++++++++----- cloud/src/meta-service/meta_service_http.cpp | 16 ++- cloud/src/meta-service/meta_service_http.h | 3 + cloud/test/meta_service_http_test.cpp | 73 ++++++++++ 6 files changed, 289 insertions(+), 41 deletions(-) diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp index 8d1c8fed983..50f29afb0ba 100644 --- a/cloud/src/common/util.cpp +++ b/cloud/src/common/util.cpp @@ -247,6 +247,30 @@ bool ValueBuf::to_pb(google::protobuf::Message* pb) const { return pb->ParseFromZeroCopyStream(&merge_stream); } +std::string ValueBuf::value() const { + butil::IOBuf merge; + for (auto&& it : iters) { + it->reset(); + while (it->has_next()) { + auto [k, v] = it->next(); + merge.append_user_data((void*)v.data(), v.size(), +[](void*) {}); + } + } + return merge.to_string(); +} + +std::vector<std::string> ValueBuf::keys() const { + std::vector<std::string> ret; + for (auto&& it : iters) { + it->reset(); + while (it->has_next()) { + auto [k, _] = it->next(); + ret.push_back({k.data(), k.size()}); + } + } + return ret; +} + void ValueBuf::remove(Transaction* txn) const { for (auto&& it : iters) { it->reset(); diff --git a/cloud/src/common/util.h b/cloud/src/common/util.h index de37c2f4d9b..8f2e8aa077e 100644 --- a/cloud/src/common/util.h +++ b/cloud/src/common/util.h @@ -89,6 +89,12 @@ struct ValueBuf { // Return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not found, otherwise for error. TxnErrorCode get(Transaction* txn, std::string_view key, bool snapshot = false); + // return the merged value in ValueBuf + std::string value() const; + + // return all keys in ValueBuf, if the value is not splitted, size of keys is 1 + std::vector<std::string> keys() const; + std::vector<std::unique_ptr<RangeGetIterator>> iters; int8_t ver {-1}; }; diff --git a/cloud/src/meta-service/http_encode_key.cpp b/cloud/src/meta-service/http_encode_key.cpp index 4d05f0121b0..728b52df2d8 100644 --- a/cloud/src/meta-service/http_encode_key.cpp +++ b/cloud/src/meta-service/http_encode_key.cpp @@ -15,9 +15,12 @@ // specific language governing permissions and limitations // under the License. +#include <brpc/controller.h> #include <brpc/uri.h> #include <fmt/format.h> #include <gen_cpp/cloud.pb.h> +#include <google/protobuf/message.h> +#include <google/protobuf/util/json_util.h> #include <bit> #include <iomanip> @@ -29,6 +32,7 @@ #include <utility> #include <vector> +#include "common/logging.h" #include "common/util.h" #include "cpp/sync_point.h" #include "meta-service/codec.h" @@ -80,6 +84,21 @@ struct KeyInfoSetter { }; // clang-format on +template <typename Message> +static std::shared_ptr<Message> parse_json(const std::string& json) { + static_assert(std::is_base_of_v<google::protobuf::Message, Message>); + auto ret = std::make_shared<Message>(); + auto st = google::protobuf::util::JsonStringToMessage(json, ret.get()); + if (st.ok()) return ret; + std::string err = "failed to strictly parse json message, error: " + st.ToString(); + LOG_WARNING(err).tag("json", json); + // ignore unknown fields + // google::protobuf::util::JsonParseOptions json_parse_options; + // json_parse_options.ignore_unknown_fields = true; + // return google::protobuf::util::JsonStringToMessage(body, req, json_parse_options); + return nullptr; +} + using param_type = const std::vector<const std::string*>; template <class ProtoType> @@ -177,41 +196,45 @@ static std::string parse_tablet_stats(const ValueBuf& buf) { } // See keys.h to get all types of key, e.g: MetaRowsetKeyInfo -// key_type -> {{param1, param2 ...}, key_encoding_func, value_parsing_func} -// where params are the input for key_encoding_func +// key_type -> {{param_name1, param_name2 ...}, key_encoding_func, serialized_pb_to_json_parsing_func, json_to_proto_parsing_func} +// where param names are the input for key_encoding_func // clang-format off +// key_type static std::unordered_map<std::string_view, - std::tuple<std::vector<std::string_view>, std::function<std::string(param_type&)>, std::function<std::string(const ValueBuf&)>>> param_set { - {"InstanceKey", {{"instance_id"}, [](param_type& p) { return instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); } , parse<InstanceInfoPB>}} , - {"TxnLabelKey", {{"instance_id", "db_id", "label"}, [](param_type& p) { return txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); } , parse_txn_label}} , - {"TxnInfoKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); } , parse<TxnInfoPB>}} , - {"TxnIndexKey", {{"instance_id", "txn_id"}, [](param_type& p) { return txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); } , parse<TxnIndexPB>}} , - {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); } , parse<TxnRunningPB>}} , - {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); } , parse<VersionPB>}} , - {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"}, [](param_type& p) { return table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); } , parse<VersionPB>}} , - {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"}, [](param_type& p) { return meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); } , parse<doris::RowsetMetaCloudPB>}} , - {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"}, [](param_type& p) { return meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); } , parse<doris::RowsetMetaCloudPB>}} , - {"MetaTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); } , parse<doris::TabletMetaCloudPB>}} , - {"MetaTabletIdxKey", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); } , parse<TabletIndexPB>}} , - {"RecycleIndexKey", {{"instance_id", "index_id"}, [](param_type& p) { return recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); } , parse<RecycleIndexPB>}} , - {"RecyclePartKey", {{"instance_id", "part_id"}, [](param_type& p) { return recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); } , parse<RecyclePartitionPB>}} , - {"RecycleRowsetKey", {{"instance_id", "tablet_id", "rowset_id"}, [](param_type& p) { return recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); } , parse<RecycleRowsetPB>}} , - {"RecycleTxnKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); } , parse<RecycleTxnPB>}} , - {"StatsTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); } , parse_tablet_stats}} , - {"JobTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); } , parse<TabletJobInfoPB>}} , - {"CopyJobKey", {{"instance_id", "stage_id", "table_id", "copy_id", "group_id"}, [](param_type& p) { return copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); } , parse<CopyJobPB>}} , - {"CopyFileKey", {{"instance_id", "stage_id", "table_id", "obj_key", "obj_etag"}, [](param_type& p) { return copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); } , parse<CopyFilePB>}} , - {"RecycleStageKey", {{"instance_id", "stage_id"}, [](param_type& p) { return recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); } , parse<RecycleStagePB>}} , - {"JobRecycleKey", {{"instance_id"}, [](param_type& p) { return job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); } , parse<JobRecyclePB>}} , - {"MetaSchemaKey", {{"instance_id", "index_id", "schema_version"}, [](param_type& p) { return meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); } , parse_tablet_schema}} , - {"MetaDeleteBitmap", {{"instance_id", "tablet_id", "rowest_id", "version", "seg_id"}, [](param_type& p) { return meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); } , parse_delete_bitmap}} , - {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", "partition_id"}, [](param_type& p) { return meta_delete_bitmap_update_lock_key( KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); }, parse<DeleteBitmapUpdateLockPB>}}, - {"MetaPendingDeleteBitmap", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_pending_delete_bitmap_key( KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); } , parse<PendingDeleteBitmapPB>}} , - {"RLJobProgressKey", {{"instance_id", "db_id", "job_id"}, [](param_type& p) { return rl_job_progress_key_info( KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); } , parse<RoutineLoadProgressPB>}} , - {"MetaServiceRegistryKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_registry_key(); } , parse<ServiceRegistryPB>}} , - {"MetaServiceArnInfoKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_arn_info_key(); } , parse<RamUserPB>}} , - {"MetaServiceEncryptionKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_encryption_key_info_key(); } , parse<EncryptionKeyInfoPB>}} , - {"StorageVaultKey", {{"instance_id", "vault_id"}, [](param_type& p) { return storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); } , parse<StorageVaultPB>}} ,}; + // params key_encoding_func serialized_pb_to_json_parsing_func json_to_proto_parsing_func + std::tuple<std::vector<std::string_view>, std::function<std::string(param_type&)>, std::function<std::string(const ValueBuf&)>, std::function<std::shared_ptr<google::protobuf::Message>(const std::string&)>>> param_set { + {"InstanceKey", {{"instance_id"}, [](param_type& p) { return instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); }, parse<InstanceInfoPB> , parse_json<InstanceInfoPB>}}, + {"TxnLabelKey", {{"instance_id", "db_id", "label"}, [](param_type& p) { return txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); }, parse_txn_label , parse_json<TxnLabelPB>}}, + {"TxnInfoKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); }, parse<TxnInfoPB> , parse_json<TxnInfoPB>}}, + {"TxnIndexKey", {{"instance_id", "txn_id"}, [](param_type& p) { return txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); }, parse<TxnIndexPB> , parse_json<TxnIndexPB>}}, + {"TxnRunningKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); }, parse<TxnRunningPB> , parse_json<TxnRunningPB>}}, + {"PartitionVersionKey", {{"instance_id", "db_id", "tbl_id", "partition_id"}, [](param_type& p) { return partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); }, parse<VersionPB> , parse_json<VersionPB>}}, + {"TableVersionKey", {{"instance_id", "db_id", "tbl_id"}, [](param_type& p) { return table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); }, parse<VersionPB> , parse_json<VersionPB>}}, + {"MetaRowsetKey", {{"instance_id", "tablet_id", "version"}, [](param_type& p) { return meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); }, parse<doris::RowsetMetaCloudPB> , parse_json<doris::RowsetMetaCloudPB>}}, + {"MetaRowsetTmpKey", {{"instance_id", "txn_id", "tablet_id"}, [](param_type& p) { return meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); }, parse<doris::RowsetMetaCloudPB> , parse_json<doris::RowsetMetaCloudPB>}}, + {"MetaTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); }, parse<doris::TabletMetaCloudPB> , parse_json<doris::TabletMetaCloudPB>}}, + {"MetaTabletIdxKey", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); }, parse<TabletIndexPB> , parse_json<TabletIndexPB>}}, + {"RecycleIndexKey", {{"instance_id", "index_id"}, [](param_type& p) { return recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); }, parse<RecycleIndexPB> , parse_json<RecycleIndexPB>}}, + {"RecyclePartKey", {{"instance_id", "part_id"}, [](param_type& p) { return recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); }, parse<RecyclePartitionPB> , parse_json<RecyclePartitionPB>}}, + {"RecycleRowsetKey", {{"instance_id", "tablet_id", "rowset_id"}, [](param_type& p) { return recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); }, parse<RecycleRowsetPB> , parse_json<RecycleRowsetPB>}}, + {"RecycleTxnKey", {{"instance_id", "db_id", "txn_id"}, [](param_type& p) { return recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); }, parse<RecycleTxnPB> , parse_json<RecycleTxnPB>}}, + {"StatsTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); }, parse_tablet_stats , parse_json<TabletStatsPB>}}, + {"JobTabletKey", {{"instance_id", "table_id", "index_id", "part_id", "tablet_id"}, [](param_type& p) { return job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); }, parse<TabletJobInfoPB> , parse_json<TabletJobInfoPB>}}, + {"CopyJobKey", {{"instance_id", "stage_id", "table_id", "copy_id", "group_id"}, [](param_type& p) { return copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); }, parse<CopyJobPB> , parse_json<CopyJobPB>}}, + {"CopyFileKey", {{"instance_id", "stage_id", "table_id", "obj_key", "obj_etag"}, [](param_type& p) { return copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); }, parse<CopyFilePB> , parse_json<CopyFilePB>}}, + {"RecycleStageKey", {{"instance_id", "stage_id"}, [](param_type& p) { return recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); }, parse<RecycleStagePB> , parse_json<RecycleStagePB>}}, + {"JobRecycleKey", {{"instance_id"}, [](param_type& p) { return job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); }, parse<JobRecyclePB> , parse_json<JobRecyclePB>}}, + {"MetaSchemaKey", {{"instance_id", "index_id", "schema_version"}, [](param_type& p) { return meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); }, parse_tablet_schema , parse_json<doris::TabletSchemaCloudPB>}}, + {"MetaDeleteBitmap", {{"instance_id", "tablet_id", "rowest_id", "version", "seg_id"}, [](param_type& p) { return meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); }, parse_delete_bitmap , parse_json<DeleteBitmapPB>}}, + {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", "partition_id"}, [](param_type& p) { return meta_delete_bitmap_update_lock_key(KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); }, parse<DeleteBitmapUpdateLockPB> , parse_json<DeleteBitmapUpdateLockPB>}}, + {"MetaPendingDeleteBitmap", {{"instance_id", "tablet_id"}, [](param_type& p) { return meta_pending_delete_bitmap_key(KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); }, parse<PendingDeleteBitmapPB> , parse_json<PendingDeleteBitmapPB>}}, + {"RLJobProgressKey", {{"instance_id", "db_id", "job_id"}, [](param_type& p) { return rl_job_progress_key_info(KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); }, parse<RoutineLoadProgressPB> , parse_json<RoutineLoadProgressPB>}}, + {"StorageVaultKey", {{"instance_id", "vault_id"}, [](param_type& p) { return storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); }, parse<StorageVaultPB> , parse_json<StorageVaultPB>}}, + {"MetaSchemaPBDictionaryKey", {{"instance_id", "index_id"}, [](param_type& p) { return meta_schema_pb_dictionary_key(KeyInfoSetter<MetaSchemaPBDictionaryInfo>{p}.get()); }, parse<SchemaCloudDictionary> , parse_json<SchemaCloudDictionary>}}, + {"MetaServiceRegistryKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_registry_key(); }, parse<ServiceRegistryPB> , parse_json<ServiceRegistryPB>}}, + {"MetaServiceArnInfoKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_arn_info_key(); }, parse<RamUserPB> , parse_json<RamUserPB>}}, + {"MetaServiceEncryptionKey", {std::vector<std::string_view> {}, [](param_type& p) { return system_meta_service_encryption_key_info_key(); }, parse<EncryptionKeyInfoPB> , parse_json<EncryptionKeyInfoPB>}}, +}; // clang-format on static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& key) { @@ -225,9 +248,10 @@ static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& k (key_type.empty() ? "(empty)" : key_type))); return status; } + auto& key_params = std::get<0>(it->second); std::remove_cv_t<param_type> params; - params.reserve(std::get<0>(it->second).size()); - for (auto& i : std::get<0>(it->second)) { + params.reserve(key_params.size()); + for (auto& i : key_params) { auto p = uri.GetQuery(i.data()); if (p == nullptr || p->empty()) { status.set_code(MetaServiceCode::INVALID_ARGUMENT); @@ -236,7 +260,8 @@ static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& k } params.emplace_back(p); } - key = std::get<1>(it->second)(params); + auto& key_encoding_function = std::get<1>(it->second); + key = key_encoding_function(params); return status; } @@ -292,7 +317,8 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri) { return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR, fmt::format("failed to get kv, key={}", hex(key))); } - auto readable_value = std::get<2>(it->second)(value); + auto& value_parsing_function = std::get<2>(it->second); + auto readable_value = value_parsing_function(value); if (readable_value.empty()) [[unlikely]] { return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR, fmt::format("failed to parse value, key={}", hex(key))); @@ -300,6 +326,114 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri) { return http_text_reply(MetaServiceCode::OK, "", readable_value); } +HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* cntl) { + const brpc::URI& uri = cntl->http_request().uri(); + std::string body = cntl->request_attachment().to_string(); + LOG(INFO) << "set value, body=" << body; + + std::string key; + if (auto hex_key = http_query(uri, "key"); !hex_key.empty()) { + key = unhex(hex_key); + } else { // Encode key from params + auto st = encode_key(uri, key); + if (st.code() != MetaServiceCode::OK) { + return http_json_reply(st); + } + } + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) [[unlikely]] { + return http_json_reply(MetaServiceCode::KV_TXN_CREATE_ERR, + fmt::format("failed to create txn, err={}", err)); + } + + std::string_view key_type = http_query(uri, "key_type"); + auto it = param_set.find(key_type); + if (it == param_set.end()) { + return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, + fmt::format("key_type not supported: {}", + (key_type.empty() ? "(empty)" : key_type))); + } + auto& json_parsing_function = std::get<3>(it->second); + std::shared_ptr<google::protobuf::Message> pb_to_save = json_parsing_function(body); + if (pb_to_save == nullptr) { + LOG(WARNING) << "invalid input json value for key_type=" << key_type; + return http_json_reply( + MetaServiceCode::INVALID_ARGUMENT, + fmt::format("invalid input json value, cannot parse json to pb, key_type={}", + key_type)); + } + + LOG(INFO) << "parsed pb to save key_type=" << key_type << " key=" << hex(key) + << " pb_to_save=" << proto_to_json(*pb_to_save); + + // ATTN: + // StatsTabletKey is special, it has a series of keys, we only handle the base stat key + // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are splited in to multiple KV + ValueBuf value; + err = cloud::get(txn.get(), key, &value, true); + + bool kv_found = true; + if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { + // it is possible the key-value to set is non-existed + kv_found = false; + } else if (err != TxnErrorCode::TXN_OK) { + return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR, + fmt::format("failed to get kv, key={}", hex(key))); + } + + auto concat = [](const std::vector<std::string>& keys) { + std::stringstream s; + for (auto& i : keys) s << hex(i) << ", "; + return s.str(); + }; + LOG(INFO) << "set value, key_type=" << key_type << " " << value.keys().size() << " keys=[" + << concat(value.keys()) << "]"; + + std::string original_value_json; + if (kv_found) { + auto& serialized_value_to_json_parsing_function = std::get<2>(it->second); + original_value_json = serialized_value_to_json_parsing_function(value); + if (original_value_json.empty()) [[unlikely]] { + LOG(WARNING) << "failed to parse value, key=" << hex(key) + << " val=" << hex(value.value()); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, + fmt::format("failed to parse value, key={}", hex(key))); + } else { + LOG(INFO) << "original_value_json=" << original_value_json; + } + } + std::string serialized_value_to_save = pb_to_save->SerializeAsString(); + if (serialized_value_to_save.empty()) { + LOG(WARNING) << "failed to serialize, key=" << hex(key); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, + fmt::format("failed to serialize, key={}", hex(key))); + } + // we need to remove the original KVs, it may be a range of keys + // and the number of final keys may be less than the initial number of keys + if (kv_found) value.remove(txn.get()); + + // TODO(gavin): use cloud::put() to deal with split-multi-kv and special keys + // StatsTabletKey is special, it has a series of keys, we only handle the base stat key + // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are splited in to multiple KV + txn->put(key, serialized_value_to_save); + err = txn->commit(); + if (err != TxnErrorCode::TXN_OK) { + std::stringstream ss; + ss << "failed to commit txn when set value, err=" << err << " key=" << hex(key); + LOG(WARNING) << ss.str(); + return http_json_reply(MetaServiceCode::UNDEFINED_ERR, ss.str()); + } + LOG(WARNING) << "set_value saved, key=" << hex(key); + + std::stringstream final_json; + final_json << "original_value_hex=" << hex(value.value()) << "\n" + << "key_hex=" << hex(key) << "\n" + << "original_value_json=" << original_value_json << "\n"; + + return http_text_reply(MetaServiceCode::OK, "", final_json.str()); +} + HttpResponse process_http_encode_key(const brpc::URI& uri) { std::string key; auto st = encode_key(uri, key); diff --git a/cloud/src/meta-service/meta_service_http.cpp b/cloud/src/meta-service/meta_service_http.cpp index 78f2dc03a95..40c5d796d91 100644 --- a/cloud/src/meta-service/meta_service_http.cpp +++ b/cloud/src/meta-service/meta_service_http.cpp @@ -158,7 +158,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, return {status_code, msg, sb.GetString()}; } -static std::string format_http_request(const brpc::HttpHeader& request) { +static std::string format_http_request(brpc::Controller* cntl) { + const brpc::HttpHeader& request = cntl->http_request(); auto& unresolved_path = request.unresolved_path(); auto& uri = request.uri(); std::stringstream ss; @@ -173,6 +174,8 @@ static std::string format_http_request(const brpc::HttpHeader& request) { for (auto it = request.HeaderBegin(); it != request.HeaderEnd(); ++it) { ss << "\n" << it->first << ":" << it->second; } + std::string body = cntl->request_attachment().to_string(); + ss << "\nbody=" << (body.empty() ? "(empty)" : body); return ss.str(); } @@ -509,6 +512,10 @@ static HttpResponse process_get_value(MetaServiceImpl* service, brpc::Controller return process_http_get_value(service->txn_kv().get(), ctrl->http_request().uri()); } +static HttpResponse process_set_value(MetaServiceImpl* service, brpc::Controller* ctrl) { + return process_http_set_value(service->txn_kv().get(), ctrl); +} + // show all key ranges and their count. static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, brpc::Controller* ctrl) { auto txn_kv = std::dynamic_pointer_cast<FdbTxnKv>(service->txn_kv()); @@ -737,6 +744,7 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"decode_key", process_decode_key}, {"encode_key", process_encode_key}, {"get_value", process_get_value}, + {"set_value", process_set_value}, {"show_meta_ranges", process_show_meta_ranges}, {"txn_lazy_commit", process_txn_lazy_commit}, {"injection_point", process_injection_point}, @@ -744,11 +752,11 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, {"v1/decode_key", process_decode_key}, {"v1/encode_key", process_encode_key}, {"v1/get_value", process_get_value}, + {"v1/set_value", process_set_value}, {"v1/show_meta_ranges", process_show_meta_ranges}, {"v1/txn_lazy_commit", process_txn_lazy_commit}, {"v1/injection_point", process_injection_point}, - // for get - {"get_instance", process_get_instance_info}, + {"v1/fix_tablet_stats", process_fix_tablet_stats}, // for get {"get_instance", process_get_instance_info}, {"get_obj_store_info", process_get_obj_store_info}, @@ -785,7 +793,7 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller, // Prepare input request info LOG(INFO) << "rpc from " << cntl->remote_side() << " request: " << cntl->http_request().uri().path(); - std::string http_request = format_http_request(cntl->http_request()); + std::string http_request = format_http_request(cntl); // Auth auto token = http_query(cntl->http_request().uri(), "token"); diff --git a/cloud/src/meta-service/meta_service_http.h b/cloud/src/meta-service/meta_service_http.h index ead53f0630f..1dca1d3d64d 100644 --- a/cloud/src/meta-service/meta_service_http.h +++ b/cloud/src/meta-service/meta_service_http.h @@ -17,6 +17,7 @@ #pragma once +#include <brpc/controller.h> #include <brpc/uri.h> #include <gen_cpp/cloud.pb.h> @@ -41,6 +42,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const std::string& msg, HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri); +HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* ctrl); + HttpResponse process_http_encode_key(const brpc::URI& uri); /// Return the query value or an empty string if not exists. diff --git a/cloud/test/meta_service_http_test.cpp b/cloud/test/meta_service_http_test.cpp index 3afb2f66bf6..e51e6fa819b 100644 --- a/cloud/test/meta_service_http_test.cpp +++ b/cloud/test/meta_service_http_test.cpp @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "meta-service/meta_service_http.h" + #include <brpc/channel.h> #include <brpc/controller.h> #include <brpc/server.h> @@ -1761,4 +1763,75 @@ TEST(MetaServiceHttpTest, UpdateConfig) { } } +TEST(HttpEncodeKeyTest, ProcessHttpSetValue) { + auto txn_kv = std::make_shared<MemTxnKv>(); + std::unique_ptr<Transaction> txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + + // Create and serialize initial RowsetMeta + RowsetMetaCloudPB initial_rowset_meta; + initial_rowset_meta.set_rowset_id_v2("12345"); + initial_rowset_meta.set_rowset_id(0); + initial_rowset_meta.set_tablet_id(67890); + initial_rowset_meta.set_num_rows(100); + initial_rowset_meta.set_data_disk_size(1024); + std::string serialized_initial = initial_rowset_meta.SerializeAsString(); + + // Generate proper rowset meta key + std::string instance_id = "test_instance"; + int64_t tablet_id = 67890; + int64_t version = 10086; + + // Generate proper rowset meta key + MetaRowsetKeyInfo key_info {instance_id, tablet_id, version}; + std::string initial_key = meta_rowset_key(key_info); + + // Store initial RowsetMeta in TxnKv + txn->put(initial_key, serialized_initial); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + + // Create new RowsetMeta to update + RowsetMetaCloudPB new_rowset_meta; + new_rowset_meta.set_rowset_id_v2("12345"); + new_rowset_meta.set_rowset_id(0); + new_rowset_meta.set_tablet_id(67890); + new_rowset_meta.set_num_rows(200); // Updated row count + new_rowset_meta.set_data_disk_size(2048); // Updated size + std::string json_value = proto_to_json(new_rowset_meta); + + // Initialize cntl URI with required parameters + brpc::URI cntl_uri; + cntl_uri._path = "/meta-service/http/set_value"; + cntl_uri.SetQuery("key_type", "MetaRowsetKey"); + cntl_uri.SetQuery("instance_id", instance_id); + cntl_uri.SetQuery("tablet_id", std::to_string(tablet_id)); + cntl_uri.SetQuery("version", std::to_string(version)); + + brpc::Controller cntl; + cntl.request_attachment().append(json_value); + cntl.http_request().uri() = cntl_uri; + + // Test update + auto response = process_http_set_value(txn_kv.get(), &cntl); + EXPECT_EQ(response.status_code, 200) << response.msg; + std::stringstream final_json; + final_json << "original_value_hex=" << hex(initial_rowset_meta.SerializeAsString()) << "\n" + << "key_hex=" << hex(initial_key) << "\n" + << "original_value_json=" << proto_to_json(initial_rowset_meta) << "\n"; + // std::cout << "xxx " << final_json.str() << std::endl; + EXPECT_EQ(response.body, final_json.str()); + + // Verify update + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string updated_value; + ASSERT_EQ(txn->get(initial_key, &updated_value), TxnErrorCode::TXN_OK); + + RowsetMetaCloudPB updated_rowset_meta; + ASSERT_TRUE(updated_rowset_meta.ParseFromString(updated_value)); + EXPECT_EQ(updated_rowset_meta.rowset_id_v2(), "12345"); + EXPECT_EQ(updated_rowset_meta.tablet_id(), 67890); + EXPECT_EQ(updated_rowset_meta.num_rows(), 200); + EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048); +} + } // namespace doris::cloud --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org