github-actions[bot] commented on code in PR #29965: URL: https://github.com/apache/doris/pull/29965#discussion_r1451986923
########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -240,50 +295,195 @@ return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented"); } -Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) { - return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented"); +Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { + VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id + << ", label: " << ctx.label << ", is_2pc: " << is_2pc; + CommitTxnRequest req; + CommitTxnResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_db_id(ctx.db_id); + req.set_txn_id(ctx.txn_id); + req.set_is_2pc(is_2pc); + return retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn); } -Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) { - return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented"); +Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { Review Comment: warning: method 'abort_txn' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:65: ```diff - Status abort_txn(const StreamLoadContext& ctx); + static Status abort_txn(const StreamLoadContext& ctx); ``` ########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -226,12 +281,12 @@ return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is not implemented"); } -Status CloudMetaMgr::prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp, +Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp, Review Comment: warning: method 'prepare_rowset' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:55: ```diff - Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp, + static Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp, ``` ########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -226,12 +281,12 @@ return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is not implemented"); } -Status CloudMetaMgr::prepare_rowset(const RowsetMeta* rs_meta, bool is_tmp, +Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp, RowsetMetaSharedPtr* existed_rs_meta) { return Status::NotSupported("CloudMetaMgr::prepare_rowset is not implemented"); } -Status CloudMetaMgr::commit_rowset(const RowsetMeta* rs_meta, bool is_tmp, +Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta, bool is_tmp, Review Comment: warning: method 'commit_rowset' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:58: ```diff - Status commit_rowset(const RowsetMeta& rs_meta, bool is_tmp, + static Status commit_rowset(const RowsetMeta& rs_meta, bool is_tmp, ``` ########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -240,50 +295,195 @@ return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented"); } -Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) { - return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented"); +Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { + VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id + << ", label: " << ctx.label << ", is_2pc: " << is_2pc; + CommitTxnRequest req; + CommitTxnResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_db_id(ctx.db_id); + req.set_txn_id(ctx.txn_id); + req.set_is_2pc(is_2pc); + return retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn); } -Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) { - return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented"); +Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { + VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id + << ", label: " << ctx.label; + AbortTxnRequest req; + AbortTxnResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + if (ctx.db_id > 0 && !ctx.label.empty()) { + req.set_db_id(ctx.db_id); + req.set_label(ctx.label); + } else { + req.set_txn_id(ctx.txn_id); + } + return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn); } -Status CloudMetaMgr::precommit_txn(StreamLoadContext* ctx) { - return Status::NotSupported("CloudMetaMgr::precommit_txn is not implemented"); +Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { + VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id + << ", label: " << ctx.label; + PrecommitTxnRequest req; + PrecommitTxnResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_db_id(ctx.db_id); + req.set_txn_id(ctx.txn_id); + return retry_rpc("precommit txn", req, &res, &MetaService_Stub::precommit_txn); } Status CloudMetaMgr::get_s3_info(std::vector<std::tuple<std::string, S3Conf>>* s3_infos) { - return Status::NotSupported("CloudMetaMgr::get_s3_info is not implemented"); + GetObjStoreInfoRequest req; + GetObjStoreInfoResponse resp; + req.set_cloud_unique_id(config::cloud_unique_id); + Status s = retry_rpc("get s3 info", req, &resp, &MetaService_Stub::get_obj_store_info); + if (!s.ok()) { + return s; + } + + for (const auto& obj_store : resp.obj_info()) { + S3Conf s3_conf; + s3_conf.ak = obj_store.ak(); + s3_conf.sk = obj_store.sk(); + s3_conf.endpoint = obj_store.endpoint(); + s3_conf.region = obj_store.region(); + s3_conf.bucket = obj_store.bucket(); + s3_conf.prefix = obj_store.prefix(); + s3_conf.sse_enabled = obj_store.sse_enabled(); + s3_conf.provider = obj_store.provider(); + s3_infos->emplace_back(obj_store.id(), std::move(s3_conf)); + } + return Status::OK(); } Status CloudMetaMgr::prepare_tablet_job(const TabletJobInfoPB& job, StartTabletJobResponse* res) { - return Status::NotSupported("CloudMetaMgr::prepare_tablet_job is not implemented"); + VLOG_DEBUG << "prepare_tablet_job: " << job.ShortDebugString(); + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_tablet_job", Status::OK(), job, res); + + StartTabletJobRequest req; + req.mutable_job()->CopyFrom(job); + req.set_cloud_unique_id(config::cloud_unique_id); + return retry_rpc("start tablet job", req, res, &MetaService_Stub::start_tablet_job); } Status CloudMetaMgr::commit_tablet_job(const TabletJobInfoPB& job, FinishTabletJobResponse* res) { - return Status::NotSupported("CloudMetaMgr::commit_tablet_job is not implemented"); + VLOG_DEBUG << "commit_tablet_job: " << job.ShortDebugString(); + TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_tablet_job", Status::OK(), job, res); + + FinishTabletJobRequest req; + req.mutable_job()->CopyFrom(job); + req.set_action(FinishTabletJobRequest::COMMIT); + req.set_cloud_unique_id(config::cloud_unique_id); + return retry_rpc("commit tablet job", req, res, &MetaService_Stub::finish_tablet_job); } Status CloudMetaMgr::abort_tablet_job(const TabletJobInfoPB& job) { - return Status::NotSupported("CloudMetaMgr::alter_tablet_job is not implemented"); + VLOG_DEBUG << "abort_tablet_job: " << job.ShortDebugString(); + FinishTabletJobRequest req; + FinishTabletJobResponse res; + req.mutable_job()->CopyFrom(job); + req.set_action(FinishTabletJobRequest::ABORT); + req.set_cloud_unique_id(config::cloud_unique_id); + return retry_rpc("abort tablet job", req, &res, &MetaService_Stub::finish_tablet_job); } Status CloudMetaMgr::lease_tablet_job(const TabletJobInfoPB& job) { - return Status::NotSupported("CloudMetaMgr::lease_tablet_job is not implemented"); + VLOG_DEBUG << "lease_tablet_job: " << job.ShortDebugString(); + FinishTabletJobRequest req; + FinishTabletJobResponse res; + req.mutable_job()->CopyFrom(job); + req.set_action(FinishTabletJobRequest::LEASE); + req.set_cloud_unique_id(config::cloud_unique_id); + return retry_rpc("lease tablet job", req, &res, &MetaService_Stub::finish_tablet_job); } -Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema* tablet_schema) { - return Status::NotSupported("CloudMetaMgr::update_tablet_schema is not implemented"); +Status CloudMetaMgr::update_tablet_schema(int64_t tablet_id, const TabletSchema& tablet_schema) { Review Comment: warning: method 'update_tablet_schema' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:79: ```diff - Status update_tablet_schema(int64_t tablet_id, const TabletSchema& tablet_schema); + static Status update_tablet_schema(int64_t tablet_id, const TabletSchema& tablet_schema); ``` ########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -240,50 +295,195 @@ return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented"); } -Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) { - return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented"); +Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { + VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id + << ", label: " << ctx.label << ", is_2pc: " << is_2pc; + CommitTxnRequest req; + CommitTxnResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + req.set_db_id(ctx.db_id); + req.set_txn_id(ctx.txn_id); + req.set_is_2pc(is_2pc); + return retry_rpc("commit txn", req, &res, &MetaService_Stub::commit_txn); } -Status CloudMetaMgr::abort_txn(StreamLoadContext* ctx) { - return Status::NotSupported("CloudMetaMgr::abort_txn is not implemented"); +Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) { + VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id + << ", label: " << ctx.label; + AbortTxnRequest req; + AbortTxnResponse res; + req.set_cloud_unique_id(config::cloud_unique_id); + if (ctx.db_id > 0 && !ctx.label.empty()) { + req.set_db_id(ctx.db_id); + req.set_label(ctx.label); + } else { + req.set_txn_id(ctx.txn_id); + } + return retry_rpc("abort txn", req, &res, &MetaService_Stub::abort_txn); } -Status CloudMetaMgr::precommit_txn(StreamLoadContext* ctx) { - return Status::NotSupported("CloudMetaMgr::precommit_txn is not implemented"); +Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) { Review Comment: warning: method 'precommit_txn' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:67: ```diff - Status precommit_txn(const StreamLoadContext& ctx); + static Status precommit_txn(const StreamLoadContext& ctx); ``` ########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -166,53 +169,105 @@ class MetaServiceProxy { std::shared_ptr<MetaService_Stub> _stub; }; -Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { - VLOG_DEBUG << "send GetTabletRequest, tablet_id: " << tablet_id; - TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::get_tablet_meta", Status::OK(), tablet_id, - tablet_meta); +template <typename T, typename... Ts> +struct is_any : std::disjunction<std::is_same<T, Ts>...> {}; + +template <typename T, typename... Ts> +constexpr bool is_any_v = is_any<T, Ts...>::value; + +template <typename Request> +static std::string debug_info(const Request& req) { + if constexpr (is_any_v<Request, CommitTxnRequest, AbortTxnRequest, PrecommitTxnRequest>) { + return fmt::format(" txn_id={}", req.txn_id()); + } else if constexpr (is_any_v<Request, StartTabletJobRequest, FinishTabletJobRequest>) { + return fmt::format(" tablet_id={}", req.job().idx().tablet_id()); + } else if constexpr (is_any_v<Request, UpdateDeleteBitmapRequest>) { + return fmt::format(" tablet_id={}, lock_id={}", req.tablet_id(), req.lock_id()); + } else if constexpr (is_any_v<Request, GetDeleteBitmapUpdateLockRequest>) { + return fmt::format(" table_id={}, lock_id={}", req.table_id(), req.lock_id()); + } else if constexpr (is_any_v<Request, GetTabletRequest>) { + return fmt::format(" tablet_id={}", req.tablet_id()); + } else if constexpr (is_any_v<Request, GetObjStoreInfoRequest>) { + return ""; + } else { + static_assert(!sizeof(Request)); + } +} + +static inline std::default_random_engine make_random_engine() { + return std::default_random_engine( + static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); +} +template <typename Request, typename Response> +using MetaServiceMethod = void (MetaService_Stub::*)(::google::protobuf::RpcController*, + const Request*, Response*, + ::google::protobuf::Closure*); + +template <typename Request, typename Response> +static Status retry_rpc(std::string_view op_name, const Request& req, Response* res, + MetaServiceMethod<Request, Response> method) { + static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); + static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); + + int retry_times = 0; + uint32_t duration_ms = 0; + std::string error_msg; + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(20, 200); + std::uniform_int_distribution<uint32_t> u2(500, 1000); std::shared_ptr<MetaService_Stub> stub; RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub)); - - int tried = 0; while (true) { brpc::Controller cntl; cntl.set_timeout_ms(config::meta_service_brpc_timeout_ms); - GetTabletRequest req; - GetTabletResponse resp; - req.set_cloud_unique_id(config::cloud_unique_id); - req.set_tablet_id(tablet_id); - stub->get_tablet(&cntl, &req, &resp, nullptr); - int retry_times = config::meta_service_rpc_retry_times; - if (cntl.Failed()) { - if (tried++ < retry_times) { - auto rng = std::default_random_engine(static_cast<uint32_t>( - std::chrono::steady_clock::now().time_since_epoch().count())); - std::uniform_int_distribution<uint32_t> u(20, 200); - std::uniform_int_distribution<uint32_t> u1(500, 1000); - uint32_t duration_ms = tried >= 100 ? u(rng) : u1(rng); - std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); - LOG_INFO("failed to get tablet meta") - .tag("reason", cntl.ErrorText()) - .tag("tablet_id", tablet_id) - .tag("tried", tried) - .tag("sleep", duration_ms); - continue; - } - return Status::RpcError("failed to get tablet meta: {}", cntl.ErrorText()); + cntl.set_max_retry(BRPC_RETRY_TIMES); + res->Clear(); + (stub.get()->*method)(&cntl, &req, res, nullptr); + if (UNLIKELY(cntl.Failed())) { + error_msg = cntl.ErrorText(); + } else if (res->status().code() == MetaServiceCode::OK) { + return Status::OK(); + } else if (res->status().code() != MetaServiceCode::KV_TXN_CONFLICT) { + return Status::Error<ErrorCode::INTERNAL_ERROR, false>("failed to {}: {}", op_name, + res->status().msg()); + } else { + error_msg = res->status().msg(); } + + if (++retry_times > config::meta_service_rpc_retry_times) { + break; + } + + duration_ms = retry_times <= 100 ? u(rng) : u2(rng); + LOG(WARNING) << "failed to " << op_name << debug_info(req) << " retry_times=" << retry_times + << " sleep=" << duration_ms << "ms : " << cntl.ErrorText(); + bthread_usleep(duration_ms * 1000); + } + return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name, error_msg); +} + +Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { Review Comment: warning: method 'get_tablet_meta' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:51: ```diff - Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta); + static Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* tablet_meta); ``` ########## be/src/cloud/cloud_meta_mgr.cpp: ########## @@ -240,50 +295,195 @@ return Status::NotSupported("CloudMetaMgr::update_tmp_rowset is not implemented"); } -Status CloudMetaMgr::commit_txn(StreamLoadContext* ctx, bool is_2pc) { - return Status::NotSupported("CloudMetaMgr::commit_txn is not implemented"); +Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) { Review Comment: warning: method 'commit_txn' can be made static [readability-convert-member-functions-to-static] be/src/cloud/cloud_meta_mgr.h:63: ```diff - Status commit_txn(const StreamLoadContext& ctx, bool is_2pc); + static Status commit_txn(const StreamLoadContext& ctx, bool is_2pc); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org