gavinchou commented on code in PR #31415: URL: https://github.com/apache/doris/pull/31415#discussion_r1503746238
########## cloud/src/meta-service/meta_service_resource.cpp: ########## @@ -218,52 +271,158 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro return; } for (auto& obj_info : *instance.mutable_obj_info()) { - if (obj_info.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), - &plain_ak_sk_pair, code, msg); - if (ret != 0) return; - obj_info.set_ak(std::move(plain_ak_sk_pair.first)); - obj_info.set_sk(std::move(plain_ak_sk_pair.second)); + if (auto ret = decrypt_and_update_ak_sk(obj_info, code, msg); ret != 0) { + return; + } + } + // Iterate all the resources to return to the rpc caller + for (const auto& resource_id : instance.resource_ids()) { + std::string storage_vault_k; + storage_vault_key({instance_id, resource_id}, &storage_vault_k); + if (auto ret = fetch_all_storage_valut(std::move(storage_vault_k), instance_id, response, + txn_kv_.get(), code, msg); + ret != 0) { + return; } } response->mutable_obj_info()->CopyFrom(instance.obj_info()); } +static std::string next_avaiable_vault_id(const InstanceInfoPB& instance) { + std::string vault_id = "1"; + auto cmp = [](const std::string& prev, const auto& last) { + size_t prev_value = std::stoi(prev); + size_t last_id = 0; + if constexpr (std::is_same_v<decltype(last), ObjectStoreInfoPB>) { + last_id = std::stoi(last.id()); + } else if constexpr (std::is_same_v<decltype(last), std::string>) { + last_id = std::stoi(last); + } else { + static_assert("Should never come here"); + } + + return prev_value > last_id ? prev : std::to_string(last_id); + }; + ; + return std::accumulate( + instance.resource_ids().begin(), instance.resource_ids().end(), + std::accumulate(instance.obj_info().begin(), instance.obj_info().end(), vault_id, cmp), + cmp); +} + +static int add_hdfs_storage_valut(InstanceInfoPB& instance, TxnKv* txn_kv, + const AlterHdfsParams& hdfs_param, MetaServiceCode& code, + std::string& msg) { + std::string key; + std::string vault_id = next_avaiable_vault_id(instance); + storage_vault_key({instance.instance_id(), vault_id}, &key); + StorageVaultPB vault; + vault.set_id(vault_id); + vault.set_name(hdfs_param.vault_name()); + *vault.mutable_hdfs_infos() = hdfs_param.hdfs(); + std::string val = vault.SerializeAsString(); + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to create txn"; + LOG(WARNING) << msg << " err=" << err; + return -1; + } + err = txn->get(key, &val); + LOG(INFO) << "get instance_key=" << hex(key); + + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + msg = fmt::format("failed to get instance, instance_id={}, err={}", instance.instance_id(), + err); + return -1; + } + txn->put(key, val); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + msg = fmt::format("failed to commit for putting storage vault_id={}, vault_name={}, err={}", + vault_id, hdfs_param.vault_name(), err); + return -1; + } + instance.mutable_resource_ids()->Add(std::move(vault_id)); + *instance.mutable_storage_vault_names()->Add() = hdfs_param.vault_name(); + return 0; +} + +[[maybe_unused]] static int remove_hdfs_storage_valut(std::string_view vault_key, TxnKv* txn_kv, + MetaServiceCode& code, std::string& msg) { + std::unique_ptr<Transaction> txn; + TxnErrorCode err = txn_kv->create_txn(&txn); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::CREATE>(err); + msg = "failed to create txn"; + LOG(WARNING) << msg << " err=" << err; + return -1; + } + txn->remove(vault_key); + LOG(INFO) << "remove storage_vault_key=" << hex(vault_key); + + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::COMMIT>(err); + msg = fmt::format("failed to commit for removing storage vault_key={}, err={}", vault_key, + err); + return -1; + } + return 0; +} + void MetaServiceImpl::alter_obj_store_info(google::protobuf::RpcController* controller, const AlterObjStoreInfoRequest* request, AlterObjStoreInfoResponse* response, ::google::protobuf::Closure* done) { + std::string ak, sk, bucket, prefix, endpoint, external_endpoint, region; + EncryptionInfoPB encryption_info; + AkSkPair cipher_ak_sk_pair; RPC_PREPROCESS(alter_obj_store_info); - // Prepare data - if (!request->has_obj() || !request->obj().has_ak() || !request->obj().has_sk()) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info err " + proto_to_json(*request); - return; - } + switch (request->op()) { + case AlterObjStoreInfoRequest::ADD_OBJ_INFO: + case AlterObjStoreInfoRequest::LEGACY_UPDATE_AK_SK: + case AlterObjStoreInfoRequest::UPDATE_AK_SK: { + // Prepare data + if (!request->has_obj() || !request->obj().has_ak() || !request->obj().has_sk()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 obj info err " + proto_to_json(*request); + return; + } - auto& obj = request->obj(); - std::string plain_ak = obj.has_ak() ? obj.ak() : ""; - std::string plain_sk = obj.has_sk() ? obj.sk() : ""; + auto& obj = request->obj(); + std::string plain_ak = obj.has_ak() ? obj.ak() : ""; + std::string plain_sk = obj.has_sk() ? obj.sk() : ""; - EncryptionInfoPB encryption_info; - AkSkPair cipher_ak_sk_pair; - if (encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, msg) != - 0) { - return; - } - const auto& [ak, sk] = cipher_ak_sk_pair; - std::string bucket = obj.has_bucket() ? obj.bucket() : ""; - std::string prefix = obj.has_prefix() ? obj.prefix() : ""; - std::string endpoint = obj.has_endpoint() ? obj.endpoint() : ""; - std::string external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : ""; - std::string region = obj.has_region() ? obj.region() : ""; + if (encrypt_ak_sk_helper(plain_ak, plain_sk, &encryption_info, &cipher_ak_sk_pair, code, + msg) != 0) { + return; + } + ak = cipher_ak_sk_pair.first; + sk = cipher_ak_sk_pair.second; + bucket = obj.has_bucket() ? obj.bucket() : ""; + prefix = obj.has_prefix() ? obj.prefix() : ""; + endpoint = obj.has_endpoint() ? obj.endpoint() : ""; + external_endpoint = obj.has_external_endpoint() ? obj.external_endpoint() : ""; + region = obj.has_region() ? obj.region() : ""; - // obj size > 1k, refuse - if (obj.ByteSizeLong() > 1024) { - code = MetaServiceCode::INVALID_ARGUMENT; - msg = "s3 obj info greater than 1k " + proto_to_json(*request); - return; + // obj size > 1k, refuse + if (obj.ByteSizeLong() > 1024) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "s3 obj info greater than 1k " + proto_to_json(*request); + return; + }; + } break; + case AlterObjStoreInfoRequest::ADD_HDFS_INFO: { + if (!request->has_hdfs()) { + code = MetaServiceCode::INVALID_ARGUMENT; + msg = "hdfs info is not found " + proto_to_json(*request); + return; + } + } break; + case AlterObjStoreInfoRequest::UNKNOWN: + break; Review Comment: any error/warning msgs if we reach this branch? it seems unexpected ########## cloud/src/meta-service/meta_service_resource.cpp: ########## @@ -218,52 +271,158 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro return; } for (auto& obj_info : *instance.mutable_obj_info()) { - if (obj_info.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), - &plain_ak_sk_pair, code, msg); - if (ret != 0) return; - obj_info.set_ak(std::move(plain_ak_sk_pair.first)); - obj_info.set_sk(std::move(plain_ak_sk_pair.second)); + if (auto ret = decrypt_and_update_ak_sk(obj_info, code, msg); ret != 0) { + return; + } + } + // Iterate all the resources to return to the rpc caller + for (const auto& resource_id : instance.resource_ids()) { + std::string storage_vault_k; + storage_vault_key({instance_id, resource_id}, &storage_vault_k); + if (auto ret = fetch_all_storage_valut(std::move(storage_vault_k), instance_id, response, + txn_kv_.get(), code, msg); + ret != 0) { + return; } } response->mutable_obj_info()->CopyFrom(instance.obj_info()); } +static std::string next_avaiable_vault_id(const InstanceInfoPB& instance) { Review Comment: lack of comment and UT, this function should be carefully tested. ########## cloud/src/meta-service/meta_service_resource.cpp: ########## @@ -218,52 +271,158 @@ void MetaServiceImpl::get_obj_store_info(google::protobuf::RpcController* contro return; } for (auto& obj_info : *instance.mutable_obj_info()) { - if (obj_info.has_encryption_info()) { - AkSkPair plain_ak_sk_pair; - int ret = decrypt_ak_sk_helper(obj_info.ak(), obj_info.sk(), obj_info.encryption_info(), - &plain_ak_sk_pair, code, msg); - if (ret != 0) return; - obj_info.set_ak(std::move(plain_ak_sk_pair.first)); - obj_info.set_sk(std::move(plain_ak_sk_pair.second)); + if (auto ret = decrypt_and_update_ak_sk(obj_info, code, msg); ret != 0) { + return; + } + } + // Iterate all the resources to return to the rpc caller + for (const auto& resource_id : instance.resource_ids()) { + std::string storage_vault_k; + storage_vault_key({instance_id, resource_id}, &storage_vault_k); Review Comment: use the form `key = storage_vault_key()`; -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org