This is an automated email from the ASF dual-hosted git repository. gavinchou pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 5641c778fb0 [fix](compaction) fixing the inaccurate statistics of concurrent compaction tasks (#37318) 5641c778fb0 is described below commit 5641c778fb0c7d9f4145e7a764eca5988efca18d Author: Luwei <814383...@qq.com> AuthorDate: Sat Jul 6 00:03:15 2024 +0800 [fix](compaction) fixing the inaccurate statistics of concurrent compaction tasks (#37318) Specify the specific stages of compaction tasks and accurately count concurrent tasks only while they are actively running --- be/src/olap/base_tablet.h | 4 ++ be/src/olap/olap_server.cpp | 51 ++++++++----- be/src/olap/storage_engine.cpp | 12 ++-- be/src/olap/storage_engine.h | 8 ++- be/src/olap/tablet_manager.cpp | 4 +- be/src/olap/tablet_manager.h | 2 +- be/test/olap/compaction_task_test.cpp | 132 ++++++++++++++++++++++++++++++++++ be/test/olap/tablet_mgr_test.cpp | 2 +- 8 files changed, 185 insertions(+), 30 deletions(-) diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 4852a6cba9b..f625ecf4a0a 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -23,6 +23,7 @@ #include "common/status.h" #include "olap/iterators.h" +#include "olap/olap_common.h" #include "olap/partial_update_info.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_fwd.h" @@ -43,6 +44,8 @@ struct TabletWithVersion { int64_t version; }; +enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING }; + // Base class for all tablet classes class BaseTablet { public: @@ -301,6 +304,7 @@ public: std::atomic<int64_t> write_count = 0; std::atomic<int64_t> compaction_count = 0; + CompactionStage compaction_stage = CompactionStage::NOT_SCHEDULED; std::mutex sample_info_lock; std::vector<CompactionSampleInfo> sample_infos; Status last_compaction_status = Status::OK(); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 91c13918818..07acf77eed9 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -44,6 +44,7 @@ #include "common/config.h" #include "common/logging.h" #include "common/status.h" +#include "common/sync_point.h" #include "gen_cpp/BackendService.h" #include "gen_cpp/FrontendService.h" #include "gen_cpp/Types_constants.h" @@ -51,6 +52,7 @@ #include "gutil/ref_counted.h" #include "io/fs/file_writer.h" // IWYU pragma: keep #include "io/fs/path.h" +#include "olap/base_tablet.h" #include "olap/cold_data_compaction.h" #include "olap/compaction_permit_limiter.h" #include "olap/cumulative_compaction_policy.h" @@ -603,8 +605,8 @@ void StorageEngine::_adjust_compaction_thread_num() { void StorageEngine::_compaction_tasks_producer_callback() { LOG(INFO) << "try to start compaction producer process!"; - std::unordered_set<TTabletId> tablet_submitted_cumu; - std::unordered_set<TTabletId> tablet_submitted_base; + std::unordered_set<TabletSharedPtr> tablet_submitted_cumu; + std::unordered_set<TabletSharedPtr> tablet_submitted_base; std::vector<DataDir*> data_dirs = get_stores(); for (auto& data_dir : data_dirs) { _tablet_submitted_cumu_compaction[data_dir] = tablet_submitted_cumu; @@ -884,6 +886,17 @@ int get_concurrent_per_disk(int max_score, int thread_per_disk) { return thread_per_disk; } +int StorageEngine::_get_executing_compaction_num( + std::unordered_set<TabletSharedPtr>& compaction_tasks) { + int num = 0; + for (const auto& task : compaction_tasks) { + if (task->compaction_stage == CompactionStage::EXECUTING) { + num++; + } + } + return num; +} + std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) { _update_cumulative_compaction_policy(); @@ -896,8 +909,8 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex // when traversing the data dir - std::map<DataDir*, std::unordered_set<TTabletId>> copied_cumu_map; - std::map<DataDir*, std::unordered_set<TTabletId>> copied_base_map; + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_cumu_map; + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> copied_base_map; { std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex); copied_cumu_map = _tablet_submitted_cumu_compaction; @@ -910,7 +923,8 @@ std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( // in the current submitted tasks. // If so, the last Slot can be assigned to Base compaction, // otherwise, this Slot needs to be reserved for cumulative compaction. - int count = copied_cumu_map[data_dir].size() + copied_base_map[data_dir].size(); + int count = _get_executing_compaction_num(copied_cumu_map[data_dir]) + + _get_executing_compaction_num(copied_base_map[data_dir]); int thread_per_disk = data_dir->is_ssd_disk() ? config::compaction_task_num_per_fast_disk : config::compaction_task_num_per_disk; @@ -973,19 +987,16 @@ bool StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr table bool already_existed = false; switch (compaction_type) { case CompactionType::CUMULATIVE_COMPACTION: - already_existed = !(_tablet_submitted_cumu_compaction[tablet->data_dir()] - .insert(tablet->tablet_id()) - .second); + already_existed = + !(_tablet_submitted_cumu_compaction[tablet->data_dir()].insert(tablet).second); break; case CompactionType::BASE_COMPACTION: - already_existed = !(_tablet_submitted_base_compaction[tablet->data_dir()] - .insert(tablet->tablet_id()) - .second); + already_existed = + !(_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet).second); break; case CompactionType::FULL_COMPACTION: - already_existed = !(_tablet_submitted_full_compaction[tablet->data_dir()] - .insert(tablet->tablet_id()) - .second); + already_existed = + !(_tablet_submitted_full_compaction[tablet->data_dir()].insert(tablet).second); break; } return already_existed; @@ -997,13 +1008,13 @@ void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet int removed = 0; switch (compaction_type) { case CompactionType::CUMULATIVE_COMPACTION: - removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + removed = _tablet_submitted_cumu_compaction[tablet->data_dir()].erase(tablet); break; case CompactionType::BASE_COMPACTION: - removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + removed = _tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet); break; case CompactionType::FULL_COMPACTION: - removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + removed = _tablet_submitted_full_compaction[tablet->data_dir()].erase(tablet); break; } @@ -1034,6 +1045,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, "compaction task has already been submitted, tablet_id={}, compaction_type={}.", tablet->tablet_id(), compaction_type); } + tablet->compaction_stage = CompactionStage::PENDING; std::shared_ptr<CompactionMixin> compaction; int64_t permits = 0; Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet, @@ -1048,17 +1060,21 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, : _base_compaction_thread_pool; auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction), compaction_type, permits, force, this]() { + tablet->compaction_stage = CompactionStage::EXECUTING; + TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction"); tablet->execute_compaction(*compaction); if (!force) { _permit_limiter.release(permits); } _pop_tablet_from_submitted_compaction(tablet, compaction_type); + tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; }); if (!st.ok()) { if (!force) { _permit_limiter.release(permits); } _pop_tablet_from_submitted_compaction(tablet, compaction_type); + tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; return Status::InternalError( "failed to submit compaction task to thread pool, " "tablet_id={}, compaction_type={}.", @@ -1067,6 +1083,7 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, return Status::OK(); } else { _pop_tablet_from_submitted_compaction(tablet, compaction_type); + tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; if (!st.ok()) { return Status::InternalError( "failed to prepare compaction task and calculate permits, " diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 90093241ad2..43093d3183e 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1404,9 +1404,9 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { rapidjson::Document arr; arr.SetArray(); - for (auto& tablet_id : it.second) { + for (auto& tablet : it.second) { rapidjson::Value key; - const std::string& key_str = std::to_string(tablet_id); + const std::string& key_str = std::to_string(tablet->tablet_id()); key.SetString(key_str.c_str(), key_str.length(), path_obj.GetAllocator()); arr.PushBack(key, root.GetAllocator()); } @@ -1428,9 +1428,9 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { rapidjson::Document arr; arr.SetArray(); - for (auto& tablet_id : it.second) { + for (auto& tablet : it.second) { rapidjson::Value key; - const std::string& key_str = std::to_string(tablet_id); + const std::string& key_str = std::to_string(tablet->tablet_id()); key.SetString(key_str.c_str(), key_str.length(), path_obj2.GetAllocator()); arr.PushBack(key, root.GetAllocator()); } @@ -1452,9 +1452,9 @@ Status StorageEngine::get_compaction_status_json(std::string* result) { rapidjson::Document arr; arr.SetArray(); - for (auto& tablet_id : it.second) { + for (auto& tablet : it.second) { rapidjson::Value key; - const std::string& key_str = std::to_string(tablet_id); + const std::string& key_str = std::to_string(tablet->tablet_id()); key.SetString(key_str.c_str(), key_str.length(), path_obj3.GetAllocator()); arr.PushBack(key, root.GetAllocator()); } diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 5ddd888db6d..64312a2b2b7 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -389,6 +389,8 @@ private: int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir); + int _get_executing_compaction_num(std::unordered_set<TabletSharedPtr>& compaction_tasks); + private: EngineOptions _options; std::mutex _store_lock; @@ -451,9 +453,9 @@ private: std::mutex _tablet_submitted_compaction_mutex; // a tablet can do base and cumulative compaction at same time - std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_cumu_compaction; - std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_base_compaction; - std::map<DataDir*, std::unordered_set<TTabletId>> _tablet_submitted_full_compaction; + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> _tablet_submitted_cumu_compaction; + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> _tablet_submitted_base_compaction; + std::map<DataDir*, std::unordered_set<TabletSharedPtr>> _tablet_submitted_full_compaction; std::mutex _low_priority_task_nums_mutex; std::unordered_map<DataDir*, int32_t> _low_priority_task_nums; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 1f8017de44e..a234ab93a47 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -722,7 +722,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) { std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( CompactionType compaction_type, DataDir* data_dir, - const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score, + const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& all_cumulative_compaction_policies) { int64_t now_ms = UnixMillis(); @@ -749,7 +749,7 @@ std::vector<TabletSharedPtr> TabletManager::find_best_tablets_to_compaction( return; } - auto search = tablet_submitted_compaction.find(tablet_ptr->tablet_id()); + auto search = tablet_submitted_compaction.find(tablet_ptr); if (search != tablet_submitted_compaction.end()) { return; } diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 51cc2af404a..809a2237356 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -82,7 +82,7 @@ public: // single compaction tasks for the tablet. std::vector<TabletSharedPtr> find_best_tablets_to_compaction( CompactionType compaction_type, DataDir* data_dir, - const std::unordered_set<TTabletId>& tablet_submitted_compaction, uint32_t* score, + const std::unordered_set<TabletSharedPtr>& tablet_submitted_compaction, uint32_t* score, const std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>& all_cumulative_compaction_policies); diff --git a/be/test/olap/compaction_task_test.cpp b/be/test/olap/compaction_task_test.cpp new file mode 100644 index 00000000000..1bb92af982b --- /dev/null +++ b/be/test/olap/compaction_task_test.cpp @@ -0,0 +1,132 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <gmock/gmock-actions.h> +#include <gmock/gmock-matchers.h> +#include <gtest/gtest-message.h> +#include <gtest/gtest-test-part.h> +#include <gtest/gtest.h> + +#include <filesystem> +#include <memory> + +#include "common/status.h" +#include "common/sync_point.h" +#include "gtest/gtest_pred_impl.h" +#include "io/fs/local_file_system.h" +#include "olap/cumulative_compaction_policy.h" +#include "olap/data_dir.h" +#include "olap/rowset/rowset_factory.h" +#include "olap/storage_engine.h" +#include "olap/tablet_manager.h" +#include "util/threadpool.h" + +namespace doris { +using namespace config; + +class CompactionTaskTest : public testing::Test { +public: + virtual void SetUp() { + _engine_data_path = "./be/test/olap/test_data/converter_test_data/tmp"; + auto st = io::global_local_filesystem()->delete_directory(_engine_data_path); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(_engine_data_path); + ASSERT_TRUE(st.ok()) << st; + EXPECT_TRUE( + io::global_local_filesystem()->create_directory(_engine_data_path + "/meta").ok()); + + EngineOptions options; + options.backend_uid = UniqueId::gen_uid(); + _storage_engine = std::make_unique<StorageEngine>(options); + _data_dir = std::make_unique<DataDir>(*_storage_engine, _engine_data_path, 100000000); + static_cast<void>(_data_dir->init()); + } + + virtual void TearDown() { + EXPECT_TRUE(io::global_local_filesystem()->delete_directory(_engine_data_path).ok()); + ExecEnv::GetInstance()->set_storage_engine(nullptr); + } + + std::unique_ptr<StorageEngine> _storage_engine; + std::string _engine_data_path; + std::unique_ptr<DataDir> _data_dir; +}; + +static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, + int data_size) { + auto rs_meta = std::make_shared<RowsetMeta>(); + rs_meta->set_rowset_type(BETA_ROWSET); // important + rs_meta->_rowset_meta_pb.set_start_version(version.first); + rs_meta->_rowset_meta_pb.set_end_version(version.second); + rs_meta->set_num_segments(num_segments); + rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); + rs_meta->set_total_disk_size(data_size); + RowsetSharedPtr rowset; + Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); + if (!st.ok()) { + return nullptr; + } + return rowset; +} + +TEST_F(CompactionTaskTest, TestSubmitCompactionTask) { + auto st = ThreadPoolBuilder("BaseCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_base_compaction_thread_pool); + EXPECT_TRUE(st.OK()); + st = ThreadPoolBuilder("CumuCompactionTaskThreadPool") + .set_min_threads(2) + .set_max_threads(2) + .build(&_storage_engine->_cumu_compaction_thread_pool); + EXPECT_TRUE(st.OK()); + + auto* sp = SyncPoint::get_instance(); + sp->enable_processing(); + sp->set_call_back("olap_server::execute_compaction", [](auto&& values) { + std::this_thread::sleep_for(std::chrono::seconds(10)); + bool* pred = try_any_cast<bool*>(values.back()); + *pred = true; + }); + + for (int tablet_cnt = 0; tablet_cnt < 10; ++tablet_cnt) { + TabletMetaSharedPtr tablet_meta; + tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, + TCompressionType::LZ4F)); + TabletSharedPtr tablet(new Tablet(*(_storage_engine.get()), tablet_meta, _data_dir.get(), + CUMULATIVE_SIZE_BASED_POLICY)); + st = tablet->init(); + EXPECT_TRUE(st.OK()); + + for (int i = 2; i < 30; ++i) { + RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); + tablet->_rs_version_map.emplace(rs->version(), rs); + } + tablet->_cumulative_point = 2; + + st = _storage_engine->_submit_compaction_task(tablet, CompactionType::CUMULATIVE_COMPACTION, + false); + EXPECT_TRUE(st.OK()); + } + + int executing_task_num = _storage_engine->_get_executing_compaction_num( + _storage_engine->_tablet_submitted_cumu_compaction[_data_dir.get()]); + EXPECT_EQ(executing_task_num, 2); +} + +} // namespace doris diff --git a/be/test/olap/tablet_mgr_test.cpp b/be/test/olap/tablet_mgr_test.cpp index 69f17ecccba..c3cd74b55ed 100644 --- a/be/test/olap/tablet_mgr_test.cpp +++ b/be/test/olap/tablet_mgr_test.cpp @@ -421,7 +421,7 @@ TEST_F(TabletMgrTest, FindTabletWithCompact) { create_tablet(id, false, rowset_size++); } - std::unordered_set<TTabletId> cumu_set; + std::unordered_set<TabletSharedPtr> cumu_set; std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>> cumulative_compaction_policies; cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org