This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 04d83f4c274 [opt](meta-service) Implement set_value API for 
meta-servce (#49052)
04d83f4c274 is described below

commit 04d83f4c274bf9d6cd35d5f8292412b862df65d5
Author: Gavin Chou <ga...@selectdb.com>
AuthorDate: Sat Mar 22 14:10:38 2025 +0800

    [opt](meta-service) Implement set_value API for meta-servce (#49052)
    
    Implement set_value API for meta-service, this API is for maintenance
    purpose, it is dangerous.
    This API accepts the factors of key and the json represented value to be
    set, and it returns the key encoded and original value (json), if
    exists. This operation will be logged.
    
    usage and example, check param_set in http_encode_key.cpp for more types
    of keys
    usage
    ```
    curl 
${meta_service_ip}:${ms_port}/MetaService/http/set_value?instance_id=${instance_id}&token=${token}&$key_type=${key_type}&{factors_for_key}"
 -d "${json_value}"
    ```
    
    example
    ```
    $ curl 
"127.1:6000/MetaService/http/set_value?key_type=MetaRowsetKey&instance_id=gavin_debug_instance_doris&tablet_id=1741892112744&version=3&token=greedisgood9999"
 -d 
'{"rowset_id":"0","partition_id":"1741892112739","tablet_id":"1741892112744","txn_id":"46080","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"540","data_disk_size":"540","index_disk_size":"0","empty":
 [...]
    '
    
original_value_hex=080010e3a2f186d93218e8a2f186d9322080e8022800300138014003480350005801609c04689c04700088010092011508838bc181cb8a8ad15e10c986c4c8bfbfe0e59301980100a0019ed4ccbe06b00100ba0130303230303030303030303030303030313033346133323464363838306632346265333366333930663866363139343961c2010131d0019ed4ccbe06da01160a090280000001026161611209028000000102616161e801dec4cdbe06980302a00300a80300b20300a0069c04a806e5a2f186d932b00600b80601c00600d00600
    
key_hex=01106d657461000110676176696e5f64656275675f696e7374616e63655f646f726973000110726f777365740001120000019590dc5168120000000000000003
    
original_value_json={"rowset_id":"0","partition_id":"1741892112739","tablet_id":"1741892112744","txn_id":"46080","tablet_schema_hash":0,"rowset_type":"BETA_ROWSET","rowset_state":"COMMITTED","start_version":"3","end_version":"3","version_hash":"0","num_rows":"1","total_disk_size":"540","data_disk_size":"540","index_disk_size":"0","empty":false,"load_id":{"hi":"6819057129990669699","lo":"-7796995410646465719"},"delete_flag":false,"creation_time":"1741892126","num_segments":"0","rowset_
 [...]
    ```
---
 cloud/src/common/util.cpp                    |  24 ++++
 cloud/src/common/util.h                      |   6 +
 cloud/src/meta-service/http_encode_key.cpp   | 208 ++++++++++++++++++++++-----
 cloud/src/meta-service/meta_service_http.cpp |  16 ++-
 cloud/src/meta-service/meta_service_http.h   |   3 +
 cloud/test/meta_service_http_test.cpp        |  73 ++++++++++
 6 files changed, 289 insertions(+), 41 deletions(-)

diff --git a/cloud/src/common/util.cpp b/cloud/src/common/util.cpp
index 8d1c8fed983..50f29afb0ba 100644
--- a/cloud/src/common/util.cpp
+++ b/cloud/src/common/util.cpp
@@ -247,6 +247,30 @@ bool ValueBuf::to_pb(google::protobuf::Message* pb) const {
     return pb->ParseFromZeroCopyStream(&merge_stream);
 }
 
+std::string ValueBuf::value() const {
+    butil::IOBuf merge;
+    for (auto&& it : iters) {
+        it->reset();
+        while (it->has_next()) {
+            auto [k, v] = it->next();
+            merge.append_user_data((void*)v.data(), v.size(), +[](void*) {});
+        }
+    }
+    return merge.to_string();
+}
+
+std::vector<std::string> ValueBuf::keys() const {
+    std::vector<std::string> ret;
+    for (auto&& it : iters) {
+        it->reset();
+        while (it->has_next()) {
+            auto [k, _] = it->next();
+            ret.push_back({k.data(), k.size()});
+        }
+    }
+    return ret;
+}
+
 void ValueBuf::remove(Transaction* txn) const {
     for (auto&& it : iters) {
         it->reset();
diff --git a/cloud/src/common/util.h b/cloud/src/common/util.h
index de37c2f4d9b..8f2e8aa077e 100644
--- a/cloud/src/common/util.h
+++ b/cloud/src/common/util.h
@@ -89,6 +89,12 @@ struct ValueBuf {
     // Return TXN_OK for success get a key, TXN_KEY_NOT_FOUND for key not 
found, otherwise for error.
     TxnErrorCode get(Transaction* txn, std::string_view key, bool snapshot = 
false);
 
+    // return the merged value in ValueBuf
+    std::string value() const;
+
+    // return all keys in ValueBuf, if the value is not splitted, size of keys 
is 1
+    std::vector<std::string> keys() const;
+
     std::vector<std::unique_ptr<RangeGetIterator>> iters;
     int8_t ver {-1};
 };
diff --git a/cloud/src/meta-service/http_encode_key.cpp 
b/cloud/src/meta-service/http_encode_key.cpp
index 4d05f0121b0..728b52df2d8 100644
--- a/cloud/src/meta-service/http_encode_key.cpp
+++ b/cloud/src/meta-service/http_encode_key.cpp
@@ -15,9 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <brpc/controller.h>
 #include <brpc/uri.h>
 #include <fmt/format.h>
 #include <gen_cpp/cloud.pb.h>
+#include <google/protobuf/message.h>
+#include <google/protobuf/util/json_util.h>
 
 #include <bit>
 #include <iomanip>
@@ -29,6 +32,7 @@
 #include <utility>
 #include <vector>
 
+#include "common/logging.h"
 #include "common/util.h"
 #include "cpp/sync_point.h"
 #include "meta-service/codec.h"
@@ -80,6 +84,21 @@ struct KeyInfoSetter {
 };
 // clang-format on
 
+template <typename Message>
+static std::shared_ptr<Message> parse_json(const std::string& json) {
+    static_assert(std::is_base_of_v<google::protobuf::Message, Message>);
+    auto ret = std::make_shared<Message>();
+    auto st = google::protobuf::util::JsonStringToMessage(json, ret.get());
+    if (st.ok()) return ret;
+    std::string err = "failed to strictly parse json message, error: " + 
st.ToString();
+    LOG_WARNING(err).tag("json", json);
+    // ignore unknown fields
+    // google::protobuf::util::JsonParseOptions json_parse_options;
+    // json_parse_options.ignore_unknown_fields = true;
+    // return google::protobuf::util::JsonStringToMessage(body, req, 
json_parse_options);
+    return nullptr;
+}
+
 using param_type = const std::vector<const std::string*>;
 
 template <class ProtoType>
@@ -177,41 +196,45 @@ static std::string parse_tablet_stats(const ValueBuf& 
buf) {
 }
 
 // See keys.h to get all types of key, e.g: MetaRowsetKeyInfo
-// key_type -> {{param1, param2 ...}, key_encoding_func, value_parsing_func}
-// where params are the input for key_encoding_func
+// key_type -> {{param_name1, param_name2 ...}, key_encoding_func, 
serialized_pb_to_json_parsing_func, json_to_proto_parsing_func}
+// where param names are the input for key_encoding_func
 // clang-format off
+//                        key_type
 static std::unordered_map<std::string_view,
-                          std::tuple<std::vector<std::string_view>, 
std::function<std::string(param_type&)>, std::function<std::string(const 
ValueBuf&)>>> param_set {
-    {"InstanceKey",                {{"instance_id"},                           
                      [](param_type& p) { return 
instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get()); }                        
              , parse<InstanceInfoPB>}}          ,
-    {"TxnLabelKey",                {{"instance_id", "db_id", "label"},         
                      [](param_type& p) { return 
txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get()); }                       
              , parse_txn_label}}                ,
-    {"TxnInfoKey",                 {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get()); }                         
              , parse<TxnInfoPB>}}               ,
-    {"TxnIndexKey",                {{"instance_id", "txn_id"},                 
                      [](param_type& p) { return 
txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get()); }                       
              , parse<TxnIndexPB>}}              ,
-    {"TxnRunningKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get()); }                   
              , parse<TxnRunningPB>}}            ,
-    {"PartitionVersionKey",        {{"instance_id", "db_id", "tbl_id", 
"partition_id"},              [](param_type& p) { return 
partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get()); }       
              , parse<VersionPB>}}               ,
-    {"TableVersionKey",            {{"instance_id", "db_id", "tbl_id"},        
                      [](param_type& p) { return 
table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get()); }               
              , parse<VersionPB>}}               ,
-    {"MetaRowsetKey",              {{"instance_id", "tablet_id", "version"},   
                      [](param_type& p) { return 
meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get()); }                   
              , parse<doris::RowsetMetaCloudPB>}}     ,
-    {"MetaRowsetTmpKey",           {{"instance_id", "txn_id", "tablet_id"},    
                      [](param_type& p) { return 
meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get()); }            
              , parse<doris::RowsetMetaCloudPB>}}     ,
-    {"MetaTabletKey",              {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get()); }                   
              , parse<doris::TabletMetaCloudPB>}}     ,
-    {"MetaTabletIdxKey",           {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get()); }            
              , parse<TabletIndexPB>}}           ,
-    {"RecycleIndexKey",            {{"instance_id", "index_id"},               
                      [](param_type& p) { return 
recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get()); }               
              , parse<RecycleIndexPB>}}          ,
-    {"RecyclePartKey",             {{"instance_id", "part_id"},                
                      [](param_type& p) { return 
recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get()); }            
              , parse<RecyclePartitionPB>}}      ,
-    {"RecycleRowsetKey",           {{"instance_id", "tablet_id", "rowset_id"}, 
                      [](param_type& p) { return 
recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get()); }             
              , parse<RecycleRowsetPB>}}         ,
-    {"RecycleTxnKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get()); }                   
              , parse<RecycleTxnPB>}}            ,
-    {"StatsTabletKey",             {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get()); }                 
              , parse_tablet_stats}}           ,
-    {"JobTabletKey",               {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get()); }                     
              , parse<TabletJobInfoPB>}}         ,
-    {"CopyJobKey",                 {{"instance_id", "stage_id", "table_id", 
"copy_id", "group_id"},  [](param_type& p) { return 
copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get()); }                         
              , parse<CopyJobPB>}}               ,
-    {"CopyFileKey",                {{"instance_id", "stage_id", "table_id", 
"obj_key", "obj_etag"},  [](param_type& p) { return 
copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get()); }                       
              , parse<CopyFilePB>}}              ,
-    {"RecycleStageKey",            {{"instance_id", "stage_id"},               
                      [](param_type& p) { return 
recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get()); }               
              , parse<RecycleStagePB>}}          ,
-    {"JobRecycleKey",              {{"instance_id"},                           
                      [](param_type& p) { return 
job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get()); }                     
              , parse<JobRecyclePB>}}            ,
-    {"MetaSchemaKey",              {{"instance_id", "index_id", 
"schema_version"},                   [](param_type& p) { return 
meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get()); }                   
              , parse_tablet_schema}}            ,
-    {"MetaDeleteBitmap",           {{"instance_id", "tablet_id", "rowest_id", 
"version", "seg_id"},  [](param_type& p) { return 
meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get()); }         
              , parse_delete_bitmap}}            ,
-    {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", 
"partition_id"},                     [](param_type& p) { return 
meta_delete_bitmap_update_lock_key( 
KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get()); }, 
parse<DeleteBitmapUpdateLockPB>}},
-    {"MetaPendingDeleteBitmap",    {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_pending_delete_bitmap_key( 
KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get()); }       , 
parse<PendingDeleteBitmapPB>}}   ,
-    {"RLJobProgressKey",           {{"instance_id", "db_id", "job_id"},        
                      [](param_type& p) { return rl_job_progress_key_info( 
KeyInfoSetter<RLJobProgressKeyInfo>{p}.get()); }                    , 
parse<RoutineLoadProgressPB>}}   ,
-    {"MetaServiceRegistryKey",     {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_registry_key(); }                                           
              , parse<ServiceRegistryPB>}}       ,
-    {"MetaServiceArnInfoKey",      {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_arn_info_key(); }                                           
              , parse<RamUserPB>}}               ,
-    {"MetaServiceEncryptionKey",   {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_encryption_key_info_key(); }                                
              , parse<EncryptionKeyInfoPB>}}     ,
-    {"StorageVaultKey",           {{"instance_id", "vault_id"},                
              [](param_type& p) { return 
storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get()); }               
     , parse<StorageVaultPB>}}   ,};
+                                     // params                      
key_encoding_func                        serialized_pb_to_json_parsing_func     
      json_to_proto_parsing_func
+                          std::tuple<std::vector<std::string_view>, 
std::function<std::string(param_type&)>, std::function<std::string(const 
ValueBuf&)>, std::function<std::shared_ptr<google::protobuf::Message>(const 
std::string&)>>> param_set {
+    {"InstanceKey",                {{"instance_id"},                           
                      [](param_type& p) { return 
instance_key(KeyInfoSetter<InstanceKeyInfo>{p}.get());                          
            }, parse<InstanceInfoPB>           , parse_json<InstanceInfoPB>}},
+    {"TxnLabelKey",                {{"instance_id", "db_id", "label"},         
                      [](param_type& p) { return 
txn_label_key(KeyInfoSetter<TxnLabelKeyInfo>{p}.get());                         
            }, parse_txn_label                 , parse_json<TxnLabelPB>}},
+    {"TxnInfoKey",                 {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_info_key(KeyInfoSetter<TxnInfoKeyInfo>{p}.get());                           
            }, parse<TxnInfoPB>                , parse_json<TxnInfoPB>}},
+    {"TxnIndexKey",                {{"instance_id", "txn_id"},                 
                      [](param_type& p) { return 
txn_index_key(KeyInfoSetter<TxnIndexKeyInfo>{p}.get());                         
            }, parse<TxnIndexPB>               , parse_json<TxnIndexPB>}},
+    {"TxnRunningKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
txn_running_key(KeyInfoSetter<TxnRunningKeyInfo>{p}.get());                     
            }, parse<TxnRunningPB>             , parse_json<TxnRunningPB>}},
+    {"PartitionVersionKey",        {{"instance_id", "db_id", "tbl_id", 
"partition_id"},              [](param_type& p) { return 
partition_version_key(KeyInfoSetter<PartitionVersionKeyInfo>{p}.get());         
            }, parse<VersionPB>                , parse_json<VersionPB>}},
+    {"TableVersionKey",            {{"instance_id", "db_id", "tbl_id"},        
                      [](param_type& p) { return 
table_version_key(KeyInfoSetter<TableVersionKeyInfo>{p}.get());                 
            }, parse<VersionPB>                , parse_json<VersionPB>}},
+    {"MetaRowsetKey",              {{"instance_id", "tablet_id", "version"},   
                      [](param_type& p) { return 
meta_rowset_key(KeyInfoSetter<MetaRowsetKeyInfo>{p}.get());                     
            }, parse<doris::RowsetMetaCloudPB> , 
parse_json<doris::RowsetMetaCloudPB>}},
+    {"MetaRowsetTmpKey",           {{"instance_id", "txn_id", "tablet_id"},    
                      [](param_type& p) { return 
meta_rowset_tmp_key(KeyInfoSetter<MetaRowsetTmpKeyInfo>{p}.get());              
            }, parse<doris::RowsetMetaCloudPB> , 
parse_json<doris::RowsetMetaCloudPB>}},
+    {"MetaTabletKey",              {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
meta_tablet_key(KeyInfoSetter<MetaTabletKeyInfo>{p}.get());                     
            }, parse<doris::TabletMetaCloudPB> , 
parse_json<doris::TabletMetaCloudPB>}},
+    {"MetaTabletIdxKey",           {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_tablet_idx_key(KeyInfoSetter<MetaTabletIdxKeyInfo>{p}.get());              
            }, parse<TabletIndexPB>            , parse_json<TabletIndexPB>}},
+    {"RecycleIndexKey",            {{"instance_id", "index_id"},               
                      [](param_type& p) { return 
recycle_index_key(KeyInfoSetter<RecycleIndexKeyInfo>{p}.get());                 
            }, parse<RecycleIndexPB>           , parse_json<RecycleIndexPB>}},
+    {"RecyclePartKey",             {{"instance_id", "part_id"},                
                      [](param_type& p) { return 
recycle_partition_key(KeyInfoSetter<RecyclePartKeyInfo>{p}.get());              
            }, parse<RecyclePartitionPB>       , 
parse_json<RecyclePartitionPB>}},
+    {"RecycleRowsetKey",           {{"instance_id", "tablet_id", "rowset_id"}, 
                      [](param_type& p) { return 
recycle_rowset_key(KeyInfoSetter<RecycleRowsetKeyInfo>{p}.get());               
            }, parse<RecycleRowsetPB>          , parse_json<RecycleRowsetPB>}},
+    {"RecycleTxnKey",              {{"instance_id", "db_id", "txn_id"},        
                      [](param_type& p) { return 
recycle_txn_key(KeyInfoSetter<RecycleTxnKeyInfo>{p}.get());                     
            }, parse<RecycleTxnPB>             , parse_json<RecycleTxnPB>}},
+    {"StatsTabletKey",             {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
stats_tablet_key(KeyInfoSetter<StatsTabletKeyInfo>{p}.get());                   
            }, parse_tablet_stats              , parse_json<TabletStatsPB>}},
+    {"JobTabletKey",               {{"instance_id", "table_id", "index_id", 
"part_id", "tablet_id"}, [](param_type& p) { return 
job_tablet_key(KeyInfoSetter<JobTabletKeyInfo>{p}.get());                       
            }, parse<TabletJobInfoPB>          , parse_json<TabletJobInfoPB>}},
+    {"CopyJobKey",                 {{"instance_id", "stage_id", "table_id", 
"copy_id", "group_id"},  [](param_type& p) { return 
copy_job_key(KeyInfoSetter<CopyJobKeyInfo>{p}.get());                           
            }, parse<CopyJobPB>                , parse_json<CopyJobPB>}},
+    {"CopyFileKey",                {{"instance_id", "stage_id", "table_id", 
"obj_key", "obj_etag"},  [](param_type& p) { return 
copy_file_key(KeyInfoSetter<CopyFileKeyInfo>{p}.get());                         
            }, parse<CopyFilePB>               , parse_json<CopyFilePB>}},
+    {"RecycleStageKey",            {{"instance_id", "stage_id"},               
                      [](param_type& p) { return 
recycle_stage_key(KeyInfoSetter<RecycleStageKeyInfo>{p}.get());                 
            }, parse<RecycleStagePB>           , parse_json<RecycleStagePB>}},
+    {"JobRecycleKey",              {{"instance_id"},                           
                      [](param_type& p) { return 
job_check_key(KeyInfoSetter<JobRecycleKeyInfo>{p}.get());                       
            }, parse<JobRecyclePB>             , parse_json<JobRecyclePB>}},
+    {"MetaSchemaKey",              {{"instance_id", "index_id", 
"schema_version"},                   [](param_type& p) { return 
meta_schema_key(KeyInfoSetter<MetaSchemaKeyInfo>{p}.get());                     
            }, parse_tablet_schema             , 
parse_json<doris::TabletSchemaCloudPB>}},
+    {"MetaDeleteBitmap",           {{"instance_id", "tablet_id", "rowest_id", 
"version", "seg_id"},  [](param_type& p) { return 
meta_delete_bitmap_key(KeyInfoSetter<MetaDeleteBitmapInfo>{p}.get());           
            }, parse_delete_bitmap             , parse_json<DeleteBitmapPB>}},
+    {"MetaDeleteBitmapUpdateLock", {{"instance_id", "table_id", 
"partition_id"},                     [](param_type& p) { return 
meta_delete_bitmap_update_lock_key(KeyInfoSetter<MetaDeleteBitmapUpdateLockInfo>{p}.get());
 }, parse<DeleteBitmapUpdateLockPB> , parse_json<DeleteBitmapUpdateLockPB>}},
+    {"MetaPendingDeleteBitmap",    {{"instance_id", "tablet_id"},              
                      [](param_type& p) { return 
meta_pending_delete_bitmap_key(KeyInfoSetter<MetaPendingDeleteBitmapInfo>{p}.get());
        }, parse<PendingDeleteBitmapPB>    , 
parse_json<PendingDeleteBitmapPB>}},
+    {"RLJobProgressKey",           {{"instance_id", "db_id", "job_id"},        
                      [](param_type& p) { return 
rl_job_progress_key_info(KeyInfoSetter<RLJobProgressKeyInfo>{p}.get());         
            }, parse<RoutineLoadProgressPB>    , 
parse_json<RoutineLoadProgressPB>}},
+    {"StorageVaultKey",            {{"instance_id", "vault_id"},               
                      [](param_type& p) { return 
storage_vault_key(KeyInfoSetter<StorageVaultKeyInfo>{p}.get());                 
            }, parse<StorageVaultPB>           , parse_json<StorageVaultPB>}},
+    {"MetaSchemaPBDictionaryKey",  {{"instance_id", "index_id"},               
                      [](param_type& p) { return 
meta_schema_pb_dictionary_key(KeyInfoSetter<MetaSchemaPBDictionaryInfo>{p}.get());
          }, parse<SchemaCloudDictionary>    , 
parse_json<SchemaCloudDictionary>}},
+    {"MetaServiceRegistryKey",     {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_registry_key();                                             
            }, parse<ServiceRegistryPB>        , 
parse_json<ServiceRegistryPB>}},
+    {"MetaServiceArnInfoKey",      {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_arn_info_key();                                             
            }, parse<RamUserPB>                , parse_json<RamUserPB>}},
+    {"MetaServiceEncryptionKey",   {std::vector<std::string_view> {},          
                      [](param_type& p) { return 
system_meta_service_encryption_key_info_key();                                  
            }, parse<EncryptionKeyInfoPB>      , 
parse_json<EncryptionKeyInfoPB>}},
+};
 // clang-format on
 
 static MetaServiceResponseStatus encode_key(const brpc::URI& uri, std::string& 
key) {
@@ -225,9 +248,10 @@ static MetaServiceResponseStatus encode_key(const 
brpc::URI& uri, std::string& k
                                    (key_type.empty() ? "(empty)" : key_type)));
         return status;
     }
+    auto& key_params = std::get<0>(it->second);
     std::remove_cv_t<param_type> params;
-    params.reserve(std::get<0>(it->second).size());
-    for (auto& i : std::get<0>(it->second)) {
+    params.reserve(key_params.size());
+    for (auto& i : key_params) {
         auto p = uri.GetQuery(i.data());
         if (p == nullptr || p->empty()) {
             status.set_code(MetaServiceCode::INVALID_ARGUMENT);
@@ -236,7 +260,8 @@ static MetaServiceResponseStatus encode_key(const 
brpc::URI& uri, std::string& k
         }
         params.emplace_back(p);
     }
-    key = std::get<1>(it->second)(params);
+    auto& key_encoding_function = std::get<1>(it->second);
+    key = key_encoding_function(params);
     return status;
 }
 
@@ -292,7 +317,8 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const 
brpc::URI& uri) {
         return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
                                fmt::format("failed to get kv, key={}", 
hex(key)));
     }
-    auto readable_value = std::get<2>(it->second)(value);
+    auto& value_parsing_function = std::get<2>(it->second);
+    auto readable_value = value_parsing_function(value);
     if (readable_value.empty()) [[unlikely]] {
         return http_json_reply(MetaServiceCode::PROTOBUF_PARSE_ERR,
                                fmt::format("failed to parse value, key={}", 
hex(key)));
@@ -300,6 +326,114 @@ HttpResponse process_http_get_value(TxnKv* txn_kv, const 
brpc::URI& uri) {
     return http_text_reply(MetaServiceCode::OK, "", readable_value);
 }
 
+HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* cntl) {
+    const brpc::URI& uri = cntl->http_request().uri();
+    std::string body = cntl->request_attachment().to_string();
+    LOG(INFO) << "set value, body=" << body;
+
+    std::string key;
+    if (auto hex_key = http_query(uri, "key"); !hex_key.empty()) {
+        key = unhex(hex_key);
+    } else { // Encode key from params
+        auto st = encode_key(uri, key);
+        if (st.code() != MetaServiceCode::OK) {
+            return http_json_reply(st);
+        }
+    }
+    std::unique_ptr<Transaction> txn;
+    TxnErrorCode err = txn_kv->create_txn(&txn);
+    if (err != TxnErrorCode::TXN_OK) [[unlikely]] {
+        return http_json_reply(MetaServiceCode::KV_TXN_CREATE_ERR,
+                               fmt::format("failed to create txn, err={}", 
err));
+    }
+
+    std::string_view key_type = http_query(uri, "key_type");
+    auto it = param_set.find(key_type);
+    if (it == param_set.end()) {
+        return http_json_reply(MetaServiceCode::INVALID_ARGUMENT,
+                               fmt::format("key_type not supported: {}",
+                                           (key_type.empty() ? "(empty)" : 
key_type)));
+    }
+    auto& json_parsing_function = std::get<3>(it->second);
+    std::shared_ptr<google::protobuf::Message> pb_to_save = 
json_parsing_function(body);
+    if (pb_to_save == nullptr) {
+        LOG(WARNING) << "invalid input json value for key_type=" << key_type;
+        return http_json_reply(
+                MetaServiceCode::INVALID_ARGUMENT,
+                fmt::format("invalid input json value, cannot parse json to 
pb, key_type={}",
+                            key_type));
+    }
+
+    LOG(INFO) << "parsed pb to save key_type=" << key_type << " key=" << 
hex(key)
+              << " pb_to_save=" << proto_to_json(*pb_to_save);
+
+    // ATTN:
+    // StatsTabletKey is special, it has a series of keys, we only handle the 
base stat key
+    // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are 
splited in to multiple KV
+    ValueBuf value;
+    err = cloud::get(txn.get(), key, &value, true);
+
+    bool kv_found = true;
+    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+        // it is possible the key-value to set is non-existed
+        kv_found = false;
+    } else if (err != TxnErrorCode::TXN_OK) {
+        return http_json_reply(MetaServiceCode::KV_TXN_GET_ERR,
+                               fmt::format("failed to get kv, key={}", 
hex(key)));
+    }
+
+    auto concat = [](const std::vector<std::string>& keys) {
+        std::stringstream s;
+        for (auto& i : keys) s << hex(i) << ", ";
+        return s.str();
+    };
+    LOG(INFO) << "set value, key_type=" << key_type << " " << 
value.keys().size() << " keys=["
+              << concat(value.keys()) << "]";
+
+    std::string original_value_json;
+    if (kv_found) {
+        auto& serialized_value_to_json_parsing_function = 
std::get<2>(it->second);
+        original_value_json = serialized_value_to_json_parsing_function(value);
+        if (original_value_json.empty()) [[unlikely]] {
+            LOG(WARNING) << "failed to parse value, key=" << hex(key)
+                         << " val=" << hex(value.value());
+            return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+                                   fmt::format("failed to parse value, 
key={}", hex(key)));
+        } else {
+            LOG(INFO) << "original_value_json=" << original_value_json;
+        }
+    }
+    std::string serialized_value_to_save = pb_to_save->SerializeAsString();
+    if (serialized_value_to_save.empty()) {
+        LOG(WARNING) << "failed to serialize, key=" << hex(key);
+        return http_json_reply(MetaServiceCode::UNDEFINED_ERR,
+                               fmt::format("failed to serialize, key={}", 
hex(key)));
+    }
+    // we need to remove the original KVs, it may be a range of keys
+    // and the number of final keys may be less than the initial number of keys
+    if (kv_found) value.remove(txn.get());
+
+    // TODO(gavin): use cloud::put() to deal with split-multi-kv and special 
keys
+    // StatsTabletKey is special, it has a series of keys, we only handle the 
base stat key
+    // MetaSchemaPBDictionaryKey, MetaSchemaKey, MetaDeleteBitmapKey are 
splited in to multiple KV
+    txn->put(key, serialized_value_to_save);
+    err = txn->commit();
+    if (err != TxnErrorCode::TXN_OK) {
+        std::stringstream ss;
+        ss << "failed to commit txn when set value, err=" << err << " key=" << 
hex(key);
+        LOG(WARNING) << ss.str();
+        return http_json_reply(MetaServiceCode::UNDEFINED_ERR, ss.str());
+    }
+    LOG(WARNING) << "set_value saved, key=" << hex(key);
+
+    std::stringstream final_json;
+    final_json << "original_value_hex=" << hex(value.value()) << "\n"
+               << "key_hex=" << hex(key) << "\n"
+               << "original_value_json=" << original_value_json << "\n";
+
+    return http_text_reply(MetaServiceCode::OK, "", final_json.str());
+}
+
 HttpResponse process_http_encode_key(const brpc::URI& uri) {
     std::string key;
     auto st = encode_key(uri, key);
diff --git a/cloud/src/meta-service/meta_service_http.cpp 
b/cloud/src/meta-service/meta_service_http.cpp
index 78f2dc03a95..40c5d796d91 100644
--- a/cloud/src/meta-service/meta_service_http.cpp
+++ b/cloud/src/meta-service/meta_service_http.cpp
@@ -158,7 +158,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const 
std::string& msg,
     return {status_code, msg, sb.GetString()};
 }
 
-static std::string format_http_request(const brpc::HttpHeader& request) {
+static std::string format_http_request(brpc::Controller* cntl) {
+    const brpc::HttpHeader& request = cntl->http_request();
     auto& unresolved_path = request.unresolved_path();
     auto& uri = request.uri();
     std::stringstream ss;
@@ -173,6 +174,8 @@ static std::string format_http_request(const 
brpc::HttpHeader& request) {
     for (auto it = request.HeaderBegin(); it != request.HeaderEnd(); ++it) {
         ss << "\n" << it->first << ":" << it->second;
     }
+    std::string body = cntl->request_attachment().to_string();
+    ss << "\nbody=" << (body.empty() ? "(empty)" : body);
     return ss.str();
 }
 
@@ -509,6 +512,10 @@ static HttpResponse process_get_value(MetaServiceImpl* 
service, brpc::Controller
     return process_http_get_value(service->txn_kv().get(), 
ctrl->http_request().uri());
 }
 
+static HttpResponse process_set_value(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
+    return process_http_set_value(service->txn_kv().get(), ctrl);
+}
+
 // show all key ranges and their count.
 static HttpResponse process_show_meta_ranges(MetaServiceImpl* service, 
brpc::Controller* ctrl) {
     auto txn_kv = std::dynamic_pointer_cast<FdbTxnKv>(service->txn_kv());
@@ -737,6 +744,7 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"decode_key", process_decode_key},
             {"encode_key", process_encode_key},
             {"get_value", process_get_value},
+            {"set_value", process_set_value},
             {"show_meta_ranges", process_show_meta_ranges},
             {"txn_lazy_commit", process_txn_lazy_commit},
             {"injection_point", process_injection_point},
@@ -744,11 +752,11 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
             {"v1/decode_key", process_decode_key},
             {"v1/encode_key", process_encode_key},
             {"v1/get_value", process_get_value},
+            {"v1/set_value", process_set_value},
             {"v1/show_meta_ranges", process_show_meta_ranges},
             {"v1/txn_lazy_commit", process_txn_lazy_commit},
             {"v1/injection_point", process_injection_point},
-            // for get
-            {"get_instance", process_get_instance_info},
+            {"v1/fix_tablet_stats", process_fix_tablet_stats},
             // for get
             {"get_instance", process_get_instance_info},
             {"get_obj_store_info", process_get_obj_store_info},
@@ -785,7 +793,7 @@ void 
MetaServiceImpl::http(::google::protobuf::RpcController* controller,
     // Prepare input request info
     LOG(INFO) << "rpc from " << cntl->remote_side()
               << " request: " << cntl->http_request().uri().path();
-    std::string http_request = format_http_request(cntl->http_request());
+    std::string http_request = format_http_request(cntl);
 
     // Auth
     auto token = http_query(cntl->http_request().uri(), "token");
diff --git a/cloud/src/meta-service/meta_service_http.h 
b/cloud/src/meta-service/meta_service_http.h
index ead53f0630f..1dca1d3d64d 100644
--- a/cloud/src/meta-service/meta_service_http.h
+++ b/cloud/src/meta-service/meta_service_http.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <brpc/controller.h>
 #include <brpc/uri.h>
 #include <gen_cpp/cloud.pb.h>
 
@@ -41,6 +42,8 @@ HttpResponse http_json_reply(MetaServiceCode code, const 
std::string& msg,
 
 HttpResponse process_http_get_value(TxnKv* txn_kv, const brpc::URI& uri);
 
+HttpResponse process_http_set_value(TxnKv* txn_kv, brpc::Controller* ctrl);
+
 HttpResponse process_http_encode_key(const brpc::URI& uri);
 
 /// Return the query value or an empty string if not exists.
diff --git a/cloud/test/meta_service_http_test.cpp 
b/cloud/test/meta_service_http_test.cpp
index 3afb2f66bf6..e51e6fa819b 100644
--- a/cloud/test/meta_service_http_test.cpp
+++ b/cloud/test/meta_service_http_test.cpp
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "meta-service/meta_service_http.h"
+
 #include <brpc/channel.h>
 #include <brpc/controller.h>
 #include <brpc/server.h>
@@ -1761,4 +1763,75 @@ TEST(MetaServiceHttpTest, UpdateConfig) {
     }
 }
 
+TEST(HttpEncodeKeyTest, ProcessHttpSetValue) {
+    auto txn_kv = std::make_shared<MemTxnKv>();
+    std::unique_ptr<Transaction> txn;
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+
+    // Create and serialize initial RowsetMeta
+    RowsetMetaCloudPB initial_rowset_meta;
+    initial_rowset_meta.set_rowset_id_v2("12345");
+    initial_rowset_meta.set_rowset_id(0);
+    initial_rowset_meta.set_tablet_id(67890);
+    initial_rowset_meta.set_num_rows(100);
+    initial_rowset_meta.set_data_disk_size(1024);
+    std::string serialized_initial = initial_rowset_meta.SerializeAsString();
+
+    // Generate proper rowset meta key
+    std::string instance_id = "test_instance";
+    int64_t tablet_id = 67890;
+    int64_t version = 10086;
+
+    // Generate proper rowset meta key
+    MetaRowsetKeyInfo key_info {instance_id, tablet_id, version};
+    std::string initial_key = meta_rowset_key(key_info);
+
+    // Store initial RowsetMeta in TxnKv
+    txn->put(initial_key, serialized_initial);
+    ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
+
+    // Create new RowsetMeta to update
+    RowsetMetaCloudPB new_rowset_meta;
+    new_rowset_meta.set_rowset_id_v2("12345");
+    new_rowset_meta.set_rowset_id(0);
+    new_rowset_meta.set_tablet_id(67890);
+    new_rowset_meta.set_num_rows(200);        // Updated row count
+    new_rowset_meta.set_data_disk_size(2048); // Updated size
+    std::string json_value = proto_to_json(new_rowset_meta);
+
+    // Initialize cntl URI with required parameters
+    brpc::URI cntl_uri;
+    cntl_uri._path = "/meta-service/http/set_value";
+    cntl_uri.SetQuery("key_type", "MetaRowsetKey");
+    cntl_uri.SetQuery("instance_id", instance_id);
+    cntl_uri.SetQuery("tablet_id", std::to_string(tablet_id));
+    cntl_uri.SetQuery("version", std::to_string(version));
+
+    brpc::Controller cntl;
+    cntl.request_attachment().append(json_value);
+    cntl.http_request().uri() = cntl_uri;
+
+    // Test update
+    auto response = process_http_set_value(txn_kv.get(), &cntl);
+    EXPECT_EQ(response.status_code, 200) << response.msg;
+    std::stringstream final_json;
+    final_json << "original_value_hex=" << 
hex(initial_rowset_meta.SerializeAsString()) << "\n"
+               << "key_hex=" << hex(initial_key) << "\n"
+               << "original_value_json=" << proto_to_json(initial_rowset_meta) 
<< "\n";
+    // std::cout << "xxx " << final_json.str() << std::endl;
+    EXPECT_EQ(response.body, final_json.str());
+
+    // Verify update
+    ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
+    std::string updated_value;
+    ASSERT_EQ(txn->get(initial_key, &updated_value), TxnErrorCode::TXN_OK);
+
+    RowsetMetaCloudPB updated_rowset_meta;
+    ASSERT_TRUE(updated_rowset_meta.ParseFromString(updated_value));
+    EXPECT_EQ(updated_rowset_meta.rowset_id_v2(), "12345");
+    EXPECT_EQ(updated_rowset_meta.tablet_id(), 67890);
+    EXPECT_EQ(updated_rowset_meta.num_rows(), 200);
+    EXPECT_EQ(updated_rowset_meta.data_disk_size(), 2048);
+}
+
 } // namespace doris::cloud


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to