This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9f4e7346fbc [fix](compaction) fixing the inaccurate statistics of
concurrent compaction tasks (#37318) (#37496)
9f4e7346fbc is described below
commit 9f4e7346fbcbf1df605a20fed5c129f913d0dd4a
Author: Luwei <[email protected]>
AuthorDate: Wed Jul 10 22:23:25 2024 +0800
[fix](compaction) fixing the inaccurate statistics of concurrent compaction
tasks (#37318) (#37496)
---
be/src/olap/base_tablet.h | 9 +++
be/src/olap/olap_server.cpp | 51 ++++++++-----
be/src/olap/storage_engine.cpp | 12 ++--
be/src/olap/storage_engine.h | 8 ++-
be/src/olap/tablet_manager.cpp | 4 +-
be/src/olap/tablet_manager.h | 2 +-
be/test/olap/compaction_task_test.cpp | 132 ++++++++++++++++++++++++++++++++++
7 files changed, 189 insertions(+), 29 deletions(-)
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 2fa494b420a..768c69624fa 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,6 +22,7 @@
#include <string>
#include "common/status.h"
+#include "olap/olap_common.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
#include "util/metrics.h"
@@ -31,6 +32,8 @@ struct RowSetSplits;
struct RowsetWriterContext;
class RowsetWriter;
+enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };
+
// Base class for all tablet classes
class BaseTablet {
public:
@@ -100,6 +103,12 @@ public:
IntCounter* flush_bytes = nullptr;
IntCounter* flush_finish_count = nullptr;
std::atomic<int64_t> published_count = 0;
+
+ std::atomic<int64_t> read_block_count = 0;
+ std::atomic<int64_t> write_count = 0;
+ std::atomic<int64_t> compaction_count = 0;
+
+ CompactionStage compaction_stage = CompactionStage::NOT_SCHEDULED;
};
} /* namespace doris */
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index fe21d1f7beb..038a5f2cd45 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -43,6 +43,7 @@
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "common/sync_point.h"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/Types_constants.h"
@@ -50,6 +51,7 @@
#include "gutil/ref_counted.h"
#include "io/fs/file_writer.h" // IWYU pragma: keep
#include "io/fs/path.h"
+#include "olap/base_tablet.h"
#include "olap/cold_data_compaction.h"
#include "olap/compaction_permit_limiter.h"
#include "olap/cumulative_compaction_policy.h"
@@ -598,8 +600,8 @@ void StorageEngine::_adjust_compaction_thread_num() {
void StorageEngine::_compaction_tasks_producer_callback() {
LOG(INFO) << "try to start compaction producer process!";
- std::unordered_set<TTabletId> tablet_submitted_cumu;
- std::unordered_set<TTabletId> tablet_submitted_base;
+ std::unordered_set<TabletSharedPtr> tablet_submitted_cumu;
+ std::unordered_set<TabletSharedPtr> tablet_submitted_base;
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
@@ -841,6 +843,17 @@ void StorageEngine::get_tablet_rowset_versions(const
PGetTabletVersionsRequest*
response->mutable_status()->set_status_code(0);
}
+int StorageEngine::_get_executing_compaction_num(
+ std::unordered_set<TabletSharedPtr>& compaction_tasks) {
+ int num = 0;
+ for (const auto& task : compaction_tasks) {
+ if (task->compaction_stage == CompactionStage::EXECUTING) {
+ num++;
+ }
+ }
+ return num;
+}
+
bool need_generate_compaction_tasks(int count, int thread_per_disk,
CompactionType compaction_type,
bool all_base) {
if (count >= thread_per_disk) {
@@ -892,8 +905,8 @@ std::vector<TabletSharedPtr>
StorageEngine::_generate_compaction_tasks(
// Copy _tablet_submitted_xxx_compaction map so that we don't need to hold
_tablet_submitted_compaction_mutex
// when traversing the data dir
- std::map<DataDir*, std::unordered_set<TTabletId>> copied_cumu_map;
- std::map<DataDir*, std::unordered_set<TTabletId>> copied_base_map;
+ std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map;
+ std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map;
{
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
copied_cumu_map = _tablet_submitted_cumu_compaction;
@@ -906,7 +919,8 @@ std::vector<TabletSharedPtr>
StorageEngine::_generate_compaction_tasks(
// in the current submitted tasks.
// If so, the last Slot can be assigned to Base compaction,
// otherwise, this Slot needs to be reserved for cumulative compaction.
- int count = copied_cumu_map[data_dir].size() +
copied_base_map[data_dir].size();
+ int count = _get_executing_compaction_num(copied_cumu_map[data_dir]) +
+ _get_executing_compaction_num(copied_base_map[data_dir]);
int thread_per_disk = data_dir->is_ssd_disk() ?
config::compaction_task_num_per_fast_disk
:
config::compaction_task_num_per_disk;
@@ -967,19 +981,16 @@ bool
StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table
bool already_existed = false;
switch (compaction_type) {
case CompactionType::CUMULATIVE_COMPACTION:
- already_existed =
!(_tablet_submitted_cumu_compaction[tablet->data_dir()]
- .insert(tablet->tablet_id())
- .second);
+ already_existed =
+
!(_tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet).second);
break;
case CompactionType::BASE_COMPACTION:
- already_existed =
!(_tablet_submitted_base_compaction[tablet->data_dir()]
- .insert(tablet->tablet_id())
- .second);
+ already_existed =
+
!(_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet).second);
break;
case CompactionType::FULL_COMPACTION:
- already_existed =
!(_tablet_submitted_full_compaction[tablet->data_dir()]
- .insert(tablet->tablet_id())
- .second);
+ already_existed =
+
!(_tablet_submitted_full_compaction[tablet->data_dir()].insert(tablet).second);
break;
}
return already_existed;
@@ -991,13 +1002,13 @@ void
StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet
int removed = 0;
switch (compaction_type) {
case CompactionType::CUMULATIVE_COMPACTION:
- removed =
_tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+ removed =
_tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet);
break;
case CompactionType::BASE_COMPACTION:
- removed =
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+ removed =
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet);
break;
case CompactionType::FULL_COMPACTION:
- removed =
_tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+ removed =
_tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet);
break;
}
@@ -1029,6 +1040,7 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
tablet->tablet_id(), compaction_type);
}
std::shared_ptr<Compaction> compaction;
+ tablet->compaction_stage = CompactionStage::PENDING;
int64_t permits = 0;
Status st =
Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet,
compaction,
permits);
@@ -1042,17 +1054,21 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
: _base_compaction_thread_pool;
auto st = thread_pool->submit_func([tablet, compaction =
std::move(compaction),
compaction_type, permits, force,
this]() {
+ tablet->compaction_stage = CompactionStage::EXECUTING;
+
TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction");
tablet->execute_compaction(*compaction);
if (!force) {
_permit_limiter.release(permits);
}
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
+ tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
});
if (!st.ok()) {
if (!force) {
_permit_limiter.release(permits);
}
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
+ tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
return Status::InternalError(
"failed to submit compaction task to thread pool, "
"tablet_id={}, compaction_type={}.",
@@ -1061,6 +1077,7 @@ Status
StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
return Status::OK();
} else {
_pop_tablet_from_submitted_compaction(tablet, compaction_type);
+ tablet->compaction_stage = CompactionStage::NOT_SCHEDULED;
if (!st.ok()) {
return Status::InternalError(
"failed to prepare compaction task and calculate permits, "
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 05fd873fcc6..82c07a59152 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -1354,9 +1354,9 @@ Status
StorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document arr;
arr.SetArray();
- for (auto& tablet_id : it.second) {
+ for (auto& tablet : it.second) {
rapidjson::Value key;
- const std::string& key_str = std::to_string(tablet_id);
+ const std::string& key_str = std::to_string(tablet->tablet_id());
key.SetString(key_str.c_str(), key_str.length(),
path_obj.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
@@ -1378,9 +1378,9 @@ Status
StorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document arr;
arr.SetArray();
- for (auto& tablet_id : it.second) {
+ for (auto& tablet : it.second) {
rapidjson::Value key;
- const std::string& key_str = std::to_string(tablet_id);
+ const std::string& key_str = std::to_string(tablet->tablet_id());
key.SetString(key_str.c_str(), key_str.length(),
path_obj2.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
@@ -1402,9 +1402,9 @@ Status
StorageEngine::get_compaction_status_json(std::string* result) {
rapidjson::Document arr;
arr.SetArray();
- for (auto& tablet_id : it.second) {
+ for (auto& tablet : it.second) {
rapidjson::Value key;
- const std::string& key_str = std::to_string(tablet_id);
+ const std::string& key_str = std::to_string(tablet->tablet_id());
key.SetString(key_str.c_str(), key_str.length(),
path_obj3.GetAllocator());
arr.PushBack(key, root.GetAllocator());
}
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 9dc18dfb276..e2a4527d8fd 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -341,6 +341,8 @@ private:
int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir);
+ int _get_executing_compaction_num(std::unordered_set<TabletSharedPtr>&
compaction_tasks);
+
private:
EngineOptions _options;
std::mutex _store_lock;
@@ -421,9 +423,9 @@ private:
std::mutex _tablet_submitted_compaction_mutex;
// a tablet can do base and cumulative compaction at same time
- std::map<DataDir*, std::unordered_set<TTabletId>>
_tablet_submitted_cumu_compaction;
- std::map<DataDir*, std::unordered_set<TTabletId>>
_tablet_submitted_base_compaction;
- std::map<DataDir*, std::unordered_set<TTabletId>>
_tablet_submitted_full_compaction;
+ std::map<DataDir*, std::unordered_set<TabletSharedPtr>>
_tablet_submitted_cumu_compaction;
+ std::map<DataDir*, std::unordered_set<TabletSharedPtr>>
_tablet_submitted_base_compaction;
+ std::map<DataDir*, std::unordered_set<TabletSharedPtr>>
_tablet_submitted_full_compaction;
std::mutex _low_priority_task_nums_mutex;
std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 36efaa8380a..875876d3b17 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -721,7 +721,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult*
result) {
TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
- const std::unordered_set<TTabletId>& tablet_submitted_compaction,
uint32_t* score,
+ const std::unordered_set<TabletSharedPtr>&
tablet_submitted_compaction, uint32_t* score,
const std::unordered_map<std::string_view,
std::shared_ptr<CumulativeCompactionPolicy>>&
all_cumulative_compaction_policies) {
int64_t now_ms = UnixMillis();
@@ -746,7 +746,7 @@ TabletSharedPtr
TabletManager::find_best_tablet_to_compaction(
return;
}
- auto search =
tablet_submitted_compaction.find(tablet_ptr->tablet_id());
+ auto search = tablet_submitted_compaction.find(tablet_ptr);
if (search != tablet_submitted_compaction.end()) {
return;
}
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 800e08a289d..5f6b8f31bd8 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -75,7 +75,7 @@ public:
TabletSharedPtr find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
- const std::unordered_set<TTabletId>& tablet_submitted_compaction,
uint32_t* score,
+ const std::unordered_set<TabletSharedPtr>&
tablet_submitted_compaction, uint32_t* score,
const std::unordered_map<std::string_view,
std::shared_ptr<CumulativeCompactionPolicy>>&
all_cumulative_compaction_policies);
diff --git a/be/test/olap/compaction_task_test.cpp
b/be/test/olap/compaction_task_test.cpp
new file mode 100644
index 00000000000..76c5e84b48a
--- /dev/null
+++ b/be/test/olap/compaction_task_test.cpp
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+#include <gtest/gtest.h>
+
+#include <filesystem>
+#include <memory>
+
+#include "common/status.h"
+#include "common/sync_point.h"
+#include "gtest/gtest_pred_impl.h"
+#include "io/fs/local_file_system.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/data_dir.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "util/threadpool.h"
+
+namespace doris {
+using namespace config;
+
+class CompactionTaskTest : public testing::Test {
+public:
+ virtual void SetUp() {
+ _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp";
+ auto st =
io::global_local_filesystem()->delete_directory(_engine_data_path);
+ ASSERT_TRUE(st.ok()) << st;
+ st =
io::global_local_filesystem()->create_directory(_engine_data_path);
+ ASSERT_TRUE(st.ok()) << st;
+ EXPECT_TRUE(
+
io::global_local_filesystem()->create_directory(_engine_data_path +
"/meta").ok());
+
+ EngineOptions options;
+ options.backend_uid = UniqueId::gen_uid();
+ _storage_engine = std::make_unique<StorageEngine>(options);
+ _data_dir = std::make_unique<DataDir>(_engine_data_path);
+ static_cast<void>(_data_dir->init());
+ }
+
+ virtual void TearDown() {
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok());
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ }
+
+ std::unique_ptr<StorageEngine> _storage_engine;
+ std::string _engine_data_path;
+ std::unique_ptr<DataDir> _data_dir;
+};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool
overlapping,
+ int data_size) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET); // important
+ rs_meta->_rowset_meta_pb.set_start_version(version.first);
+ rs_meta->_rowset_meta_pb.set_end_version(version.second);
+ rs_meta->set_num_segments(num_segments);
+ rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+ rs_meta->set_total_disk_size(data_size);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta),
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+}
+
+TEST_F(CompactionTaskTest, TestSubmitCompactionTask) {
+ auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_base_compaction_thread_pool);
+ EXPECT_TRUE(st.OK());
+ st = ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+ .set_min_threads(2)
+ .set_max_threads(2)
+ .build(&_storage_engine->_cumu_compaction_thread_pool);
+ EXPECT_TRUE(st.OK());
+
+ auto* sp = SyncPoint::get_instance();
+ sp->enable_processing();
+ sp->set_call_back("olap_server::execute_compaction", [](auto&& values) {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+ bool* pred = try_any_cast<bool*>(values.back());
+ *pred = true;
+ });
+
+ for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) {
+ TabletMetaSharedPtr tablet_meta;
+ tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()),
tablet_meta, _data_dir.get(),
+ CUMULATIVE_SIZE_BASED_POLICY));
+ st = tablet->init();
+ EXPECT_TRUE(st.OK());
+
+ for (int i = 2; i < 30; ++i) {
+ RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+ tablet->_rs_version_map.emplace(rs->version(), rs);
+ }
+ tablet->_cumulative_point = 2;
+
+ st = _storage_engine->_submit_compaction_task(tablet,
CompactionType::CUMULATIVE_COMPACTION,
+ false);
+ EXPECT_TRUE(st.OK());
+ }
+
+ int executing_task_num = _storage_engine->_get_executing_compaction_num(
+
_storage_engine->_tablet_submitted_cumu_compaction[_data_dir.get()]);
+ EXPECT_EQ(executing_task_num, 2);
+}
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]