This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c8aca786315 [enhancement](compaction) optimize the cpu consumption of
the compaction task producer thread (#40152)
c8aca786315 is described below
commit c8aca786315808d10cbe477e2113da8af10a11c0
Author: Luwei <[email protected]>
AuthorDate: Mon Oct 14 09:56:07 2024 +0800
[enhancement](compaction) optimize the cpu consumption of the compaction
task producer thread (#40152)
Co-authored-by: Yongqiang YANG
<[email protected]>
---
be/src/common/config.cpp | 4 +-
be/src/common/config.h | 2 +
be/src/olap/compaction.cpp | 1 +
be/src/olap/tablet.cpp | 47 +++++-
be/src/olap/tablet.h | 25 +++-
be/src/olap/tablet_manager.cpp | 19 ++-
be/test/olap/compaction_score_test.cpp | 158 +++++++++++++++++++++
be/test/olap/cumulative_compaction_policy_test.cpp | 8 +-
...mulative_compaction_time_series_policy_test.cpp | 12 +-
be/test/olap/tablet_mgr_test.cpp | 6 +-
10 files changed, 256 insertions(+), 26 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5527ab07885..071eac13e81 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -396,7 +396,7 @@ DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
-DEFINE_Bool(enable_skip_tablet_compaction, "true");
+DEFINE_Bool(enable_skip_tablet_compaction, "false");
// output rowset of cumulative compaction total disk size exceed this config
size,
// this rowset will be given to base compaction, unit is m byte.
DEFINE_mInt64(compaction_promotion_size_mbytes, "1024");
@@ -1348,6 +1348,8 @@ DEFINE_mInt32(lz4_compression_block_size, "262144");
DEFINE_mBool(enable_pipeline_task_leakage_detect, "false");
+DEFINE_mInt32(check_score_rounds_num, "1000");
+
DEFINE_Int32(query_cache_size, "512");
// clang-format off
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e2789913703..585c4dc45cc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1433,6 +1433,8 @@ DECLARE_mInt32(lz4_compression_block_size);
DECLARE_mBool(enable_pipeline_task_leakage_detect);
+DECLARE_mInt32(check_score_rounds_num);
+
// MB
DECLARE_Int32(query_cache_size);
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 8b37e9ba174..84830502366 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1093,6 +1093,7 @@ Status CompactionMixin::modify_rowsets() {
LOG(WARNING) << "failed to remove old version delete bitmap, st: "
<< st;
}
}
+
return Status::OK();
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 51eabe5495e..1e8518b47dc 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -57,7 +57,6 @@
#include "agent/utils.h"
#include "common/config.h"
#include "common/consts.h"
-#include "common/exception.h"
#include "common/logging.h"
#include "common/signal_handler.h"
#include "common/status.h"
@@ -489,6 +488,7 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
RETURN_IF_ERROR(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
_rs_version_map[rowset->version()] = rowset;
_timestamped_version_tracker.add_version(rowset->version());
+ add_compaction_score(rowset->rowset_meta()->get_compaction_score());
std::vector<RowsetSharedPtr> rowsets_to_delete;
// yiguolei: temp code, should remove the rowset contains by this rowset
@@ -594,6 +594,17 @@ Status
Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
}
}
}
+
+ int32_t add_score = 0;
+ for (auto rs : to_add) {
+ add_score += rs->rowset_meta()->get_compaction_score();
+ }
+ int32_t sub_score = 0;
+ for (auto rs : to_delete) {
+ sub_score += rs->rowset_meta()->get_compaction_score();
+ }
+ add_compaction_score(add_score - sub_score);
+
return Status::OK();
}
@@ -668,6 +679,9 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr&
rowset) {
_timestamped_version_tracker.add_version(rowset->version());
++_newly_created_rowset_num;
+
+ add_compaction_score(rowset->rowset_meta()->get_compaction_score());
+
return Status::OK();
}
@@ -983,17 +997,41 @@ bool Tablet::can_do_compaction(size_t path_hash,
CompactionType compaction_type)
return tablet_state() == TABLET_RUNNING || tablet_state() ==
TABLET_NOTREADY;
}
-uint32_t Tablet::calc_compaction_score(
+uint32_t Tablet::calc_compaction_score() {
+ if (_score_check_cnt++ % config::check_score_rounds_num != 0) {
+ std::shared_lock rdlock(_meta_lock);
+ if (_compaction_score > 0) {
+ return _compaction_score;
+ }
+ }
+
+ {
+ // Need meta lock, because it will iterator "all_rs_metas" of tablet
meta.
+ std::shared_lock rdlock(_meta_lock);
+ int32_t score = get_real_compaction_score();
+ if (_compaction_score > 0 && _compaction_score != score) {
+ LOG(WARNING) << "cumu cache score not equal real score, cache
score; "
+ << _compaction_score << ", real score: " << score
+ << ", tablet: " << tablet_id();
+ }
+ _compaction_score = score;
+ return score;
+ }
+}
+
+bool Tablet::suitable_for_compaction(
CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy) {
// Need meta lock, because it will iterator "all_rs_metas" of tablet meta.
std::shared_lock rdlock(_meta_lock);
+ int32_t score = -1;
if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) {
- return _calc_cumulative_compaction_score(cumulative_compaction_policy);
+ score =
_calc_cumulative_compaction_score(cumulative_compaction_policy);
} else {
DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION);
- return _calc_base_compaction_score();
+ score = _calc_base_compaction_score();
}
+ return score > 0;
}
uint32_t Tablet::calc_cold_data_compaction_score() const {
@@ -1790,6 +1828,7 @@ void Tablet::execute_compaction(CompactionMixin&
compaction) {
watch.start();
Status res = [&]() { RETURN_IF_CATCH_EXCEPTION({ return
compaction.execute_compact(); }); }();
+
if (!res.ok()) [[unlikely]] {
set_last_failure_time(this, compaction, UnixMillis());
LOG(WARNING) << "failed to do " << compaction.compaction_name()
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 33253e82ced..d7d10978d5a 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -221,10 +221,12 @@ public:
// operation for compaction
bool can_do_compaction(size_t path_hash, CompactionType compaction_type);
- uint32_t calc_compaction_score(
+ bool suitable_for_compaction(
CompactionType compaction_type,
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy);
+ uint32_t calc_compaction_score();
+
// This function to find max continuous version from the beginning.
// For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet,
then 3 is target.
// 3 will be saved in "version", and 7 will be saved in "max_version", if
max_version != nullptr
@@ -482,6 +484,24 @@ public:
inline bool is_full_compaction_running() const { return
_is_full_compaction_running; }
void clear_cache() override;
+ int32_t get_compaction_score() const { return _compaction_score; }
+
+ void set_compaction_score(int32_t compaction_score) { _compaction_score =
compaction_score; }
+
+ void add_compaction_score(int32_t score) {
+ if (_compaction_score < 0) {
+ return;
+ }
+ _compaction_score += score;
+ }
+
+ void minus_compaction_score(int32_t score) {
+ if (_compaction_score < 0) {
+ return;
+ }
+ _compaction_score -= score;
+ }
+
private:
Status _init_once_action();
bool _contains_rowset(const RowsetId rowset_id);
@@ -608,6 +628,9 @@ private:
std::shared_ptr<const VersionWithTime> _visible_version;
std::atomic_bool _is_full_compaction_running = false;
+
+ int32_t _compaction_score = -1;
+ int32_t _score_check_cnt = 0;
};
inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 468a6b2fb12..64eb408c9e3 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -797,8 +797,7 @@ std::vector<TabletSharedPtr>
TabletManager::find_best_tablets_to_compaction(
}
auto cumulative_compaction_policy =
all_cumulative_compaction_policies.at(
tablet_ptr->tablet_meta()->compaction_policy());
- uint32_t current_compaction_score =
- tablet_ptr->calc_compaction_score(compaction_type,
cumulative_compaction_policy);
+ uint32_t current_compaction_score =
tablet_ptr->calc_compaction_score();
if (current_compaction_score < 5) {
tablet_ptr->set_skip_compaction(true, compaction_type,
UnixSeconds());
}
@@ -806,14 +805,22 @@ std::vector<TabletSharedPtr>
TabletManager::find_best_tablets_to_compaction(
// tablet should do single compaction
if (current_compaction_score > single_compact_highest_score &&
tablet_ptr->should_fetch_from_peer()) {
- single_compact_highest_score = current_compaction_score;
- best_single_compact_tablet = tablet_ptr;
+ bool ret = tablet_ptr->suitable_for_compaction(compaction_type,
+
cumulative_compaction_policy);
+ if (ret) {
+ single_compact_highest_score = current_compaction_score;
+ best_single_compact_tablet = tablet_ptr;
+ }
}
// tablet should do cumu or base compaction
if (current_compaction_score > highest_score &&
!tablet_ptr->should_fetch_from_peer()) {
- highest_score = current_compaction_score;
- best_tablet = tablet_ptr;
+ bool ret = tablet_ptr->suitable_for_compaction(compaction_type,
+
cumulative_compaction_policy);
+ if (ret) {
+ highest_score = current_compaction_score;
+ best_tablet = tablet_ptr;
+ }
}
};
diff --git a/be/test/olap/compaction_score_test.cpp
b/be/test/olap/compaction_score_test.cpp
new file mode 100644
index 00000000000..de4e5cdde0a
--- /dev/null
+++ b/be/test/olap/compaction_score_test.cpp
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <memory>
+
+#include "common/status.h"
+#include "cpp/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/data_dir.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "util/threadpool.h"
+
+namespace doris {
+using namespace config;
+
+class CompactionScoreTest : public testing::Test {
+public:
+ virtual void SetUp() {
+ _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
+ auto st =
io::global_local_filesystem()->delete_directory(_engine_data_path);
+ ASSERT_TRUE(st.ok()) << st;
+ st =
io::global_local_filesystem()->create_directory(_engine_data_path);
+ ASSERT_TRUE(st.ok()) << st;
+ EXPECT_TRUE(
+
io::global_local_filesystem()->create_directory(_engine_data_path +
"/meta").ok());
+
+ EngineOptions options;
+ options.backend_uid = UniqueId::gen_uid();
+ _storage_engine = std::make_unique<StorageEngine>(options);
+ _data_dir = std::make_unique<DataDir>(*_storage_engine,
_engine_data_path, 100000000);
+ static_cast<void>(_data_dir->init());
+ }
+
+ virtual void TearDown() {
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ }
+
+ RowsetSharedPtr create_rowset(Version version, int num_segments, bool
overlapping,
+ int data_size) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET); // important
+ rs_meta->_rowset_meta_pb.set_start_version(version.first);
+ rs_meta->_rowset_meta_pb.set_end_version(version.second);
+ rs_meta->set_rowset_id(_storage_engine->next_rowset_id());
+ rs_meta->set_num_segments(num_segments);
+ rs_meta->set_segments_overlap(overlapping ? OVERLAPPING :
NONOVERLAPPING);
+ rs_meta->set_total_disk_size(data_size);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "",
std::move(rs_meta), &rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+ }
+
+ std::unique_ptr<StorageEngine> _storage_engine;
+ std::string _engine_data_path;
+ std::unique_ptr<DataDir> _data_dir;
+};
+
+TEST_F(CompactionScoreTest, TestCompactionScore) {
+ /*
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.OK());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.OK());
+ */
+
+ /*
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::execute_compaction", [](auto&& values) {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+ bool* pred = try_any_cast<bool*>(values.back());
+ *pred = true;
+ });
+ */
+
+ TabletMetaSharedPtr tablet_meta;
+ tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta,
_data_dir.get(),
+ CUMULATIVE_SIZE_BASED_POLICY));
+ Status st = tablet->init();
+ EXPECT_TRUE(st.OK());
+
+ for (int i = 2; i < 10; ++i) {
+ RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 102400);
+ st = tablet->add_inc_rowset(rs);
+ EXPECT_TRUE(st.OK());
+ }
+ EXPECT_EQ(tablet->get_compaction_score(), -1);
+ EXPECT_EQ(tablet->calc_compaction_score(), 8);
+ EXPECT_EQ(tablet->get_real_compaction_score(), 8);
+
+ for (int i = 10; i < 30; ++i) {
+ RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 102400);
+ st = tablet->add_inc_rowset(rs);
+ EXPECT_TRUE(st.OK());
+ }
+ EXPECT_EQ(tablet->get_compaction_score(), 28);
+ EXPECT_EQ(tablet->calc_compaction_score(), 28);
+ EXPECT_EQ(tablet->get_real_compaction_score(), 28);
+
+ std::vector<RowsetSharedPtr> input_rowsets = tablet->get_snapshot_rowset();
+ for (auto it = input_rowsets.begin(); it != input_rowsets.end();) {
+ if ((*it)->start_version() < 10) {
+ it = input_rowsets.erase(it);
+ } else {
+ it++;
+ }
+ }
+
+ RowsetSharedPtr rs = create_rowset({10, 29}, 1, false, 102400);
+ std::vector<RowsetSharedPtr> output_rowsets;
+ output_rowsets.push_back(rs);
+ st = tablet->modify_rowsets(output_rowsets, input_rowsets, true);
+ EXPECT_TRUE(st.OK());
+
+ EXPECT_EQ(tablet->get_compaction_score(), 9);
+ EXPECT_EQ(tablet->calc_compaction_score(), 9);
+ EXPECT_EQ(tablet->get_real_compaction_score(), 9);
+}
+
+} // namespace doris
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp
b/be/test/olap/cumulative_compaction_policy_test.cpp
index e4775031c28..1fc735296ce 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -350,8 +350,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
calc_cumulative_compaction_score
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_SIZE_BASED_POLICY);
- const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
-
cumulative_compaction_policy);
+ const uint32_t score = _tablet->calc_compaction_score();
EXPECT_EQ(15, score);
}
@@ -372,10 +371,9 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy,
calc_cumulative_compaction_score
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_SIZE_BASED_POLICY);
- const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
-
cumulative_compaction_policy);
+ const uint32_t score = _tablet->calc_compaction_score();
- EXPECT_EQ(7, score);
+ EXPECT_EQ(9, score);
}
TEST_F(TestSizeBasedCumulativeCompactionPolicy,
calculate_cumulative_point_big_base) {
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 3e88e424e43..79e540fca77 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -401,10 +401,10 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
calc_cumulative_compaction_scor
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_TIME_SERIES_POLICY);
- const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
-
cumulative_compaction_policy);
+ bool ret =
_tablet->suitable_for_compaction(CompactionType::CUMULATIVE_COMPACTION,
+ cumulative_compaction_policy);
- EXPECT_EQ(9, score);
+ EXPECT_EQ(true, ret);
}
TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
calc_cumulative_compaction_score_big_rowset) {
@@ -423,10 +423,10 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
calc_cumulative_compaction_scor
std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy =
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
CUMULATIVE_TIME_SERIES_POLICY);
- const uint32_t score =
_tablet->calc_compaction_score(CompactionType::CUMULATIVE_COMPACTION,
-
cumulative_compaction_policy);
+ bool ret =
_tablet->suitable_for_compaction(CompactionType::CUMULATIVE_COMPACTION,
+ cumulative_compaction_policy);
- EXPECT_EQ(5, score);
+ EXPECT_EQ(true, ret);
}
TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_candidate_rowsets) {
diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp
index c3cd74b55ed..1bcdcdf45c6 100644
--- a/be/test/olap/tablet_mgr_test.cpp
+++ b/be/test/olap/tablet_mgr_test.cpp
@@ -436,7 +436,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 1);
ASSERT_EQ(compact_tablets[0]->tablet_id(), 10);
- ASSERT_EQ(score, 13);
+ ASSERT_EQ(score, 14);
// create 10 tablets enable single compact
// 5 tablets do cumu compaction, 5 tablets do single compaction
@@ -451,7 +451,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
ASSERT_EQ(compact_tablets.size(), 2);
ASSERT_EQ(compact_tablets[0]->tablet_id(), 19);
ASSERT_EQ(compact_tablets[1]->tablet_id(), 20);
- ASSERT_EQ(score, 23);
+ ASSERT_EQ(score, 24);
create_tablet(21, false, rowset_size++);
@@ -460,7 +460,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) {
cumulative_compaction_policies);
ASSERT_EQ(compact_tablets.size(), 1);
ASSERT_EQ(compact_tablets[0]->tablet_id(), 21);
- ASSERT_EQ(score, 24);
+ ASSERT_EQ(score, 25);
// drop all tablets
for (int64_t id = 1; id <= 20; ++id) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]