This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new d1d52ae68c8 [feature](compaction) Add an http action for visibility of compaction score on each tablet (#38489) (#40826) d1d52ae68c8 is described below commit d1d52ae68c8a372f21b19a70f604f30d712a927a Author: Siyang Tang <82279870+tangsiyang2...@users.noreply.github.com> AuthorDate: Sat Sep 21 20:35:55 2024 +0800 [feature](compaction) Add an http action for visibility of compaction score on each tablet (#38489) (#40826) pick: #38489 Usage: 1. `curl http://be_ip:be_host/api/compaction_score?top_n=10` Returns a json object contains compaction score for top n, n=top_n. ``` [ { "compaction_score": "5", "tablet_id": "42595" }, { "compaction_score": "5", "tablet_id": "42587" }, { "compaction_score": "5", "tablet_id": "42593" }, { "compaction_score": "5", "tablet_id": "42597" }, { "compaction_score": "5", "tablet_id": "42589" }, { "compaction_score": "5", "tablet_id": "42599" }, { "compaction_score": "5", "tablet_id": "42601" }, { "compaction_score": "5", "tablet_id": "42591" }, { "compaction_score": "5", "tablet_id": "42585" }, { "compaction_score": "4", "tablet_id": "10034" } ] ``` If top_n is not specified, return all compaction score for all tablets. If top_n is illegal, raise an error. ``` invalid argument: top_n=wrong ``` 2. `curl http://be_ip:be_host/api/compaction_score?sync_meta=true` `sync_meta` is only available on cloud mode, will sync meta from meta service. It can cooperate with top_n. If add param `sync_meta` on non-cloud mode, will raise an error. ``` sync meta is only available for cloud mode ``` 3. In the future, this endpoint may extend other utility, like fetching tablet compaction score by table id, etc. ## Proposed changes Issue Number: close #xxx <!--Describe your changes.--> --- be/src/http/action/compaction_score_action.cpp | 160 +++++++++++++++++++++ be/src/http/action/compaction_score_action.h | 62 ++++++++ be/src/olap/base_tablet.cpp | 8 ++ be/src/olap/base_tablet.h | 4 + be/src/olap/tablet.cpp | 3 + be/src/service/http_service.cpp | 8 ++ .../compaction/test_compaction_score_action.groovy | 53 +++++++ 7 files changed, 298 insertions(+) diff --git a/be/src/http/action/compaction_score_action.cpp b/be/src/http/action/compaction_score_action.cpp new file mode 100644 index 00000000000..8c89d34221d --- /dev/null +++ b/be/src/http/action/compaction_score_action.cpp @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "http/action/compaction_score_action.h" + +#include <gen_cpp/FrontendService_types.h> +#include <gen_cpp/Types_types.h> +#include <glog/logging.h> +#include <rapidjson/document.h> +#include <rapidjson/prettywriter.h> +#include <rapidjson/stringbuffer.h> + +#include <algorithm> +#include <cstddef> +#include <cstdint> +#include <exception> +#include <functional> +#include <iterator> +#include <limits> +#include <memory> +#include <span> +#include <stdexcept> +#include <string> +#include <string_view> +#include <vector> + +#include "common/status.h" +#include "http/http_channel.h" +#include "http/http_handler_with_auth.h" +#include "http/http_headers.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "olap/tablet_fwd.h" +#include "olap/tablet_manager.h" +#include "util/stopwatch.hpp" + +namespace doris { + +const std::string TOP_N = "top_n"; +const std::string COMPACTION_SCORE = "compaction_score"; +constexpr size_t DEFAULT_TOP_N = std::numeric_limits<size_t>::max(); +constexpr std::string_view TABLET_ID = "tablet_id"; + +template <typename T> +concept CompactionScoreAccessble = requires(T t) { + { t.get_real_compaction_score() } -> std::same_as<uint32_t>; +}; + +template <CompactionScoreAccessble T> +std::vector<CompactionScoreResult> calculate_compaction_scores( + std::span<std::shared_ptr<T>> tablets) { + std::vector<CompactionScoreResult> result; + result.reserve(tablets.size()); + std::ranges::transform(tablets, std::back_inserter(result), + [](const std::shared_ptr<T>& tablet) -> CompactionScoreResult { + return {.tablet_id = tablet->tablet_id(), + .compaction_score = tablet->get_real_compaction_score()}; + }); + return result; +} + +struct LocalCompactionScoreAccessor final : CompactionScoresAccessor { + LocalCompactionScoreAccessor(TabletManager* tablet_mgr) : tablet_mgr(tablet_mgr) {} + + std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() override { + auto tablets = tablet_mgr->get_all_tablet(); + std::span<TabletSharedPtr> s = {tablets.begin(), tablets.end()}; + return calculate_compaction_scores(s); + } + + TabletManager* tablet_mgr; +}; + +static rapidjson::Value jsonfy_tablet_compaction_score( + const CompactionScoreResult& result, rapidjson::MemoryPoolAllocator<>& allocator) { + rapidjson::Value node; + node.SetObject(); + + rapidjson::Value tablet_id_key; + tablet_id_key.SetString(TABLET_ID.data(), TABLET_ID.length(), allocator); + rapidjson::Value tablet_id_val; + auto tablet_id_str = std::to_string(result.tablet_id); + tablet_id_val.SetString(tablet_id_str.c_str(), tablet_id_str.length(), allocator); + + rapidjson::Value score_key; + score_key.SetString(COMPACTION_SCORE.data(), COMPACTION_SCORE.size()); + rapidjson::Value score_val; + auto score_str = std::to_string(result.compaction_score); + score_val.SetString(score_str.c_str(), score_str.length(), allocator); + node.AddMember(score_key, score_val, allocator); + + node.AddMember(tablet_id_key, tablet_id_val, allocator); + return node; +} + +CompactionScoreAction::CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, TabletManager* tablet_mgr) + : HttpHandlerWithAuth(exec_env, hier, type), + _accessor(std::make_unique<LocalCompactionScoreAccessor>(tablet_mgr)) {} + +void CompactionScoreAction::handle(HttpRequest* req) { + req->add_output_header(HttpHeaders::CONTENT_TYPE, HttpHeaders::JsonType.data()); + auto top_n_param = req->param(TOP_N); + + size_t top_n = DEFAULT_TOP_N; + if (!top_n_param.empty()) { + try { + auto tmp_top_n = std::stoll(top_n_param); + if (tmp_top_n < 0) { + throw std::invalid_argument("`top_n` cannot less than 0"); + } + top_n = tmp_top_n; + } catch (const std::exception& e) { + LOG(WARNING) << "convert failed:" << e.what(); + auto msg = fmt::format("invalid argument: top_n={}", top_n_param); + HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, msg); + return; + } + } + + std::string result; + if (auto st = _handle(top_n, &result); !st) { + HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, st.to_json()); + return; + } + HttpChannel::send_reply(req, HttpStatus::OK, result); +} + +Status CompactionScoreAction::_handle(size_t top_n, std::string* result) { + auto scores = _accessor->get_all_tablet_compaction_scores(); + top_n = std::min(top_n, scores.size()); + std::partial_sort(scores.begin(), scores.begin() + top_n, scores.end(), std::greater<>()); + + rapidjson::Document root; + root.SetArray(); + auto& allocator = root.GetAllocator(); + std::for_each(scores.begin(), scores.begin() + top_n, [&](const auto& score) { + root.PushBack(jsonfy_tablet_compaction_score(score, allocator), allocator); + }); + rapidjson::StringBuffer str_buf; + rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(str_buf); + root.Accept(writer); + *result = str_buf.GetString(); + return Status::OK(); +} + +} // namespace doris diff --git a/be/src/http/action/compaction_score_action.h b/be/src/http/action/compaction_score_action.h new file mode 100644 index 00000000000..3c5c619f245 --- /dev/null +++ b/be/src/http/action/compaction_score_action.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <gen_cpp/FrontendService_types.h> + +#include <cstddef> +#include <memory> +#include <string> + +#include "common/status.h" +#include "http/http_handler_with_auth.h" +#include "http/http_request.h" +#include "olap/storage_engine.h" +#include "runtime/exec_env.h" +namespace doris { + +struct CompactionScoreResult { + int64_t tablet_id; + size_t compaction_score; +}; + +inline bool operator>(const CompactionScoreResult& lhs, const CompactionScoreResult& rhs) { + return lhs.compaction_score > rhs.compaction_score; +} + +struct CompactionScoresAccessor { + virtual ~CompactionScoresAccessor() = default; + + virtual std::vector<CompactionScoreResult> get_all_tablet_compaction_scores() = 0; +}; + +// topn, sync +class CompactionScoreAction : public HttpHandlerWithAuth { +public: + explicit CompactionScoreAction(ExecEnv* exec_env, TPrivilegeHier::type hier, + TPrivilegeType::type type, TabletManager* tablet_mgr); + + void handle(HttpRequest* req) override; + +private: + Status _handle(size_t top_n, std::string* result); + + std::unique_ptr<CompactionScoresAccessor> _accessor; +}; + +} // namespace doris diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index c67c83bbec8..3d8ef137387 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -83,4 +83,12 @@ Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr& update_ return Status::OK(); } +uint32_t BaseTablet::get_real_compaction_score() const { + const auto& rs_metas = _tablet_meta->all_rs_metas(); + return std::accumulate(rs_metas.begin(), rs_metas.end(), 0, + [](uint32_t score, const RowsetMetaSharedPtr& rs_meta) { + return score + rs_meta->get_compaction_score(); + }); +} + } /* namespace doris */ diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 4338986efe6..4fdc81aa985 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -87,6 +87,10 @@ public: virtual size_t tablet_footprint() = 0; + // this method just return the compaction sum on each rowset + // note(tsy): we should unify the compaction score calculation finally + uint32_t get_real_compaction_score() const; + protected: mutable std::shared_mutex _meta_lock; const TabletMetaSharedPtr _tablet_meta; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index dede9f2600f..ad0cc795dc3 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1147,6 +1147,9 @@ uint32_t Tablet::calc_cold_data_compaction_score() const { uint32_t Tablet::_calc_cumulative_compaction_score( std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) { + if (cumulative_compaction_policy == nullptr) [[unlikely]] { + return 0; + } #ifndef BE_TEST if (_cumulative_compaction_policy == nullptr || _cumulative_compaction_policy->name() != cumulative_compaction_policy->name()) { diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 92ee66ec6f7..a46e9c569d5 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -19,6 +19,7 @@ #include <event2/bufferevent.h> #include <event2/http.h> +#include <gen_cpp/FrontendService_types.h> #include <string> #include <vector> @@ -33,6 +34,7 @@ #include "http/action/checksum_action.h" #include "http/action/clear_cache_action.h" #include "http/action/compaction_action.h" +#include "http/action/compaction_score_action.h" #include "http/action/config_action.h" #include "http/action/debug_point_action.h" #include "http/action/download_action.h" @@ -258,6 +260,12 @@ Status HttpService::start() { SnapshotAction* snapshot_action = _pool.add(new SnapshotAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN)); _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); + + CompactionScoreAction* compaction_score_action = + _pool.add(new CompactionScoreAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN, + _env->get_storage_engine()->tablet_manager())); + _ev_http_server->register_handler(HttpMethod::GET, "/api/compaction_score", + compaction_score_action); #endif // 2 compaction actions diff --git a/regression-test/suites/compaction/test_compaction_score_action.groovy b/regression-test/suites/compaction/test_compaction_score_action.groovy new file mode 100644 index 00000000000..9ab8743778f --- /dev/null +++ b/regression-test/suites/compaction/test_compaction_score_action.groovy @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_compaction_score_action") { + def tableName = "test_compaction_score_action"; + + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + id INT NOT NULL, + name STRING NOT NULL + ) DUPLICATE KEY (`id`) + PROPERTIES ("replication_num" = "1", "disable_auto_compaction" = "true"); + """ + for (i in 0..<30) { + sql """ INSERT INTO ${tableName} VALUES(1, "Vedal") """ + sql """ INSERT INTO ${tableName} VALUES(2, "Neuro") """ + sql """ INSERT INTO ${tableName} VALUES(3, "Evil") """ + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + for (int i=0;i<backendId_to_backendIP.size();i++){ + def beHttpAddress =backendId_to_backendIP.entrySet()[i].getValue()+":"+backendId_to_backendHttpPort.entrySet()[i].getValue() + if (isCloudMode()) { + def (code, text, err) = curl("GET",beHttpAddress+ "/api/compaction_score?top_n=1&sync_meta=true") + def score_str = parseJson(text).get(0).get("compaction_score") + def score = Integer.parseInt(score_str) + assertTrue(score >= 90) + } else { + def (code, text, err) = curl("GET",beHttpAddress+"/api/compaction_score?top_n=1") + def score_str = parseJson(text).get(0).get("compaction_score") + def score = Integer.parseInt(score_str) + assertTrue(score >= 90) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org