This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 246da2de99b branch-3.0: [enhance](metrics)add metrics to show
compaction task num #50706 (#50883)
246da2de99b is described below
commit 246da2de99b05dd23cf2c8cde1512da363ac0531
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 14 20:59:42 2025 +0800
branch-3.0: [enhance](metrics)add metrics to show compaction task num
#50706 (#50883)
Cherry-picked from #50706
Co-authored-by: koarz <[email protected]>
---
be/src/cloud/cloud_storage_engine.cpp | 16 ++++
be/src/olap/olap_server.cpp | 25 ++++++
be/src/util/doris_metrics.cpp | 16 ++++
be/src/util/doris_metrics.h | 5 ++
be/test/olap/compaction_metrics_test.cpp | 141 +++++++++++++++++++++++++++++++
5 files changed, 203 insertions(+)
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 929f5480538..01d9205a33a 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -681,11 +681,17 @@ Status
CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
_submitted_base_compactions[tablet->tablet_id()] = compaction;
}
st = _base_compaction_thread_pool->submit_func([=, this, compaction =
std::move(compaction)]() {
+
DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
+
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+ _base_compaction_thread_pool->get_queue_size());
g_base_compaction_running_task_count << 1;
signal::tablet_id = tablet->tablet_id();
Defer defer {[&]() {
g_base_compaction_running_task_count << -1;
_submitted_base_compactions.erase(tablet->tablet_id());
+
DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
+
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+ _base_compaction_thread_pool->get_queue_size());
}};
auto st =
_request_tablet_global_compaction_lock(ReaderType::READER_BASE_COMPACTION,
tablet,
compaction);
@@ -699,6 +705,8 @@ Status
CloudStorageEngine::_submit_base_compaction_task(const CloudTabletSPtr& t
std::lock_guard lock(_compaction_mtx);
_executing_base_compactions.erase(tablet->tablet_id());
});
+ DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+ _base_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
std::lock_guard lock(_compaction_mtx);
_submitted_base_compactions.erase(tablet->tablet_id());
@@ -777,6 +785,9 @@ Status
CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
}
};
st = _cumu_compaction_thread_pool->submit_func([=, this, compaction =
std::move(compaction)]() {
+
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+ _cumu_compaction_thread_pool->get_queue_size());
DBUG_EXECUTE_IF("CloudStorageEngine._submit_cumulative_compaction_task.wait_in_line",
{ sleep(5); })
signal::tablet_id = tablet->tablet_id();
@@ -792,6 +803,9 @@ Status
CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
}
g_cumu_compaction_running_task_count << -1;
erase_submitted_cumu_compaction();
+
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1);
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+ _cumu_compaction_thread_pool->get_queue_size());
}};
auto st =
_request_tablet_global_compaction_lock(ReaderType::READER_CUMULATIVE_COMPACTION,
tablet, compaction);
@@ -845,6 +859,8 @@ Status
CloudStorageEngine::_submit_cumulative_compaction_task(const CloudTabletS
}
erase_executing_cumu_compaction();
});
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+ _cumu_compaction_thread_pool->get_queue_size());
if (!st.ok()) {
erase_submitted_cumu_compaction();
return Status::InternalError("failed to submit cumu compaction,
tablet_id={}",
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 41cc66f0c13..6410fd421f1 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -82,6 +82,7 @@
#include "util/debug_points.h"
#include "util/doris_metrics.h"
#include "util/mem_info.h"
+#include "util/metrics.h"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
@@ -1049,6 +1050,15 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
<< ", num_total_queued_tasks: " <<
thread_pool->get_queue_size();
auto st = thread_pool->submit_func([tablet, compaction =
std::move(compaction),
compaction_type, permits, force,
this]() {
+ if (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
[[likely]] {
+
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1);
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+ _cumu_compaction_thread_pool->get_queue_size());
+ } else if (compaction_type == CompactionType::BASE_COMPACTION) {
+
DorisMetrics::instance()->base_compaction_task_running_total->increment(1);
+
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+ _base_compaction_thread_pool->get_queue_size());
+ }
bool is_large_task = true;
Defer defer {[&]() {
DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep",
{ sleep(5); })
@@ -1063,6 +1073,14 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
if (!is_large_task) {
_cumu_compaction_thread_pool_small_tasks_running--;
}
+
DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(
+ -1);
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+ _cumu_compaction_thread_pool->get_queue_size());
+ } else if (compaction_type == CompactionType::BASE_COMPACTION)
{
+
DorisMetrics::instance()->base_compaction_task_running_total->increment(-1);
+
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+ _base_compaction_thread_pool->get_queue_size());
}
}};
do {
@@ -1121,6 +1139,13 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
tablet->execute_compaction(*compaction);
});
+ if (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
[[likely]] {
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value(
+ _cumu_compaction_thread_pool->get_queue_size());
+ } else if (compaction_type == CompactionType::BASE_COMPACTION) {
+
DorisMetrics::instance()->base_compaction_task_pending_total->set_value(
+ _base_compaction_thread_pool->get_queue_size());
+ }
if (!st.ok()) {
if (!force) {
_permit_limiter.release(permits);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 48b88cd5727..362efa86809 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -99,6 +99,17 @@
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_bytes_total, MetricUn
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(full_compaction_bytes_total,
MetricUnit::BYTES, "",
compaction_bytes_total, Labels({{"type",
"full"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(base_compaction_task_running_total,
MetricUnit::ROWSETS, "",
+ compaction_task_state_total,
Labels({{"type", "base"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(base_compaction_task_pending_total,
MetricUnit::ROWSETS, "",
+ compaction_task_state_total,
Labels({{"type", "base"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_task_running_total,
MetricUnit::ROWSETS,
+ "", compaction_task_state_total,
+ Labels({{"type", "cumulative"}}));
+DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(cumulative_compaction_task_pending_total,
MetricUnit::ROWSETS,
+ "", compaction_task_state_total,
+ Labels({{"type", "cumulative"}}));
+
DEFINE_COUNTER_METRIC_PROTOTYPE_5ARG(segment_read_total,
MetricUnit::OPERATIONS,
"(segment_v2) total number of segments
read", segment_read,
Labels({{"type", "segment_read_total"}}));
@@ -251,6 +262,11 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
full_compaction_deltas_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
full_compaction_bytes_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
base_compaction_task_running_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
base_compaction_task_pending_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
cumulative_compaction_task_running_total);
+ INT_COUNTER_METRIC_REGISTER(_server_metric_entity,
cumulative_compaction_task_pending_total);
+
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_read_total);
INT_COUNTER_METRIC_REGISTER(_server_metric_entity, segment_row_total);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 6fbc24d6922..0d9c060bfb8 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -95,6 +95,11 @@ public:
IntCounter* full_compaction_deltas_total = nullptr;
IntCounter* full_compaction_bytes_total = nullptr;
+ IntCounter* base_compaction_task_running_total = nullptr;
+ IntCounter* base_compaction_task_pending_total = nullptr;
+ IntCounter* cumulative_compaction_task_running_total = nullptr;
+ IntCounter* cumulative_compaction_task_pending_total = nullptr;
+
IntCounter* publish_task_request_total = nullptr;
IntCounter* publish_task_failed_total = nullptr;
diff --git a/be/test/olap/compaction_metrics_test.cpp
b/be/test/olap/compaction_metrics_test.cpp
new file mode 100644
index 00000000000..a556384ecc9
--- /dev/null
+++ b/be/test/olap/compaction_metrics_test.cpp
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+#include <unistd.h>
+
+#include <chrono>
+#include <filesystem>
+#include <memory>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/data_dir.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "util/doris_metrics.h"
+#include "util/threadpool.h"
+
+namespace doris {
+using namespace config;
+
+class CompactionMetricsTest : public testing::Test {
+public:
+ void SetUp() override {
+ _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
+ auto st =
io::global_local_filesystem()->delete_directory(_engine_data_path);
+ ASSERT_TRUE(st.ok()) << st;
+ st =
io::global_local_filesystem()->create_directory(_engine_data_path);
+ ASSERT_TRUE(st.ok()) << st;
+ EXPECT_TRUE(
+
io::global_local_filesystem()->create_directory(_engine_data_path +
"/meta").ok());
+
+ EngineOptions options;
+ options.backend_uid = UniqueId::gen_uid();
+ _storage_engine = std::make_unique<StorageEngine>(options);
+ _data_dir = std::make_unique<DataDir>(*_storage_engine,
_engine_data_path, 100000000);
+ static_cast<void>(_data_dir->init());
+ }
+
+ void TearDown() override {
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ }
+
+ std::unique_ptr<StorageEngine> _storage_engine;
+ std::string _engine_data_path;
+ std::unique_ptr<DataDir> _data_dir;
+};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool
overlapping,
+ int data_size) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET); // important
+ rs_meta->_rowset_meta_pb.set_start_version(version.first);
+ rs_meta->_rowset_meta_pb.set_end_version(version.second);
+ rs_meta->set_num_segments(num_segments);
+ rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+ rs_meta->set_total_disk_size(data_size);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", rs_meta, &rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+}
+
+TEST_F(CompactionMetricsTest, TestCompactionTaskNumWithDiffStatus) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.ok());
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::execute_compaction", [](auto&& values) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ bool* pred = try_any_cast<bool*>(values.back());
+ *pred = true;
+ });
+
+ for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
+ TabletMetaSharedPtr tablet_meta;
+ tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()),
tablet_meta, _data_dir.get(),
+ CUMULATIVE_SIZE_BASED_POLICY));
+ st = tablet->init();
+ EXPECT_TRUE(st.ok());
+
+ for (int i = 2; i < 30; ++i) {
+ RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+ tablet->_rs_version_map.emplace(rs->version(), rs);
+ }
+ tablet->_cumulative_point = 2;
+
+ st = _storage_engine->_submit_compaction_task(tablet,
CompactionType::CUMULATIVE_COMPACTION,
+ false);
+ EXPECT_TRUE(st.ok());
+ std::this_thread::sleep_for(std::chrono::milliseconds(150));
+
EXPECT_EQ(_storage_engine->_cumu_compaction_thread_pool->num_active_threads(),
+
DorisMetrics::instance()->cumulative_compaction_task_running_total->value());
+
EXPECT_EQ(_storage_engine->_cumu_compaction_thread_pool->get_queue_size(),
+
DorisMetrics::instance()->cumulative_compaction_task_pending_total->value());
+
EXPECT_EQ(_storage_engine->_base_compaction_thread_pool->num_active_threads(),
+
DorisMetrics::instance()->base_compaction_task_running_total->value());
+
EXPECT_EQ(_storage_engine->_base_compaction_thread_pool->get_queue_size(),
+
DorisMetrics::instance()->base_compaction_task_pending_total->value());
+ }
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]