This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1aa9ac4fe44 Prevent making snapshot on remote rowset in single replica compaction (#28716) 1aa9ac4fe44 is described below commit 1aa9ac4fe44ee23db95a757d2d2ecf368d5774b9 Author: plat1ko <platonekos...@gmail.com> AuthorDate: Wed Dec 27 23:43:43 2023 +0800 Prevent making snapshot on remote rowset in single replica compaction (#28716) --- be/src/common/status.h | 2 - be/src/olap/base_compaction.cpp | 8 - be/src/olap/base_compaction.h | 2 - be/src/olap/compaction.h | 7 +- be/src/olap/cumulative_compaction.cpp | 8 - be/src/olap/cumulative_compaction.h | 2 - be/src/olap/full_compaction.cpp | 10 - be/src/olap/full_compaction.h | 2 - be/src/olap/olap_server.cpp | 61 +++-- be/src/olap/single_replica_compaction.cpp | 8 - be/src/olap/snapshot_manager.cpp | 4 +- be/src/olap/storage_engine.cpp | 22 -- be/src/olap/storage_engine.h | 12 - be/src/olap/tablet.cpp | 289 ++++++++------------- be/src/olap/tablet.h | 37 +-- be/src/olap/task/engine_clone_task.cpp | 1 - be/src/olap/task/engine_storage_migration_task.cpp | 2 - 17 files changed, 162 insertions(+), 315 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 14aec46cc22..c0d92152877 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -160,7 +160,6 @@ namespace ErrorCode { E(BE_INVALID_NEED_MERGED_VERSIONS, -810, true); \ E(BE_ERROR_DELETE_ACTION, -811, true); \ E(BE_SEGMENTS_OVERLAPPING, -812, true); \ - E(BE_CLONE_OCCURRED, -813, true); \ E(PUSH_INIT_ERROR, -900, true); \ E(PUSH_VERSION_INCORRECT, -902, true); \ E(PUSH_SCHEMA_MISMATCH, -903, true); \ @@ -228,7 +227,6 @@ namespace ErrorCode { E(CUMULATIVE_INVALID_NEED_MERGED_VERSIONS, -2004, true); \ E(CUMULATIVE_ERROR_DELETE_ACTION, -2005, true); \ E(CUMULATIVE_MISS_VERSION, -2006, true); \ - E(CUMULATIVE_CLONE_OCCURRED, -2007, true); \ E(FULL_NO_SUITABLE_VERSION, -2008, false); \ E(FULL_MISS_VERSION, -2009, true); \ E(META_INVALID_ARGUMENT, -3000, true); \ diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 6ae006709a7..474909cbf45 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -56,7 +56,6 @@ Status BaseCompaction::prepare_compact() { // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size()); - _tablet->set_clone_occurred(false); return Status::OK(); } @@ -73,13 +72,6 @@ Status BaseCompaction::execute_compact_impl() { "another base compaction is running. tablet={}", _tablet->tablet_id()); } - // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked - // for compaction may change. In this case, current compaction task should not be executed. - if (_tablet->get_clone_occurred()) { - _tablet->set_clone_occurred(false); - return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred failed"); - } - SCOPED_ATTACH_TASK(_mem_tracker); // 2. do base compaction, merge rowsets diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h index 73aca0d5e1f..e86cb3330a5 100644 --- a/be/src/olap/base_compaction.h +++ b/be/src/olap/base_compaction.h @@ -43,8 +43,6 @@ public: Status prepare_compact() override; Status execute_compact_impl() override; - std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; } - protected: Status pick_rowsets_to_compact() override; std::string compaction_name() const override { return "base compaction"; } diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 2aff05ced83..a8e3b1a5c28 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -58,6 +58,8 @@ public: virtual Status prepare_compact() = 0; Status execute_compact(); virtual Status execute_compact_impl() = 0; + + const std::vector<RowsetSharedPtr>& input_rowsets() { return _input_rowsets; } #ifdef BE_TEST void set_input_rowset(const std::vector<RowsetSharedPtr>& rowsets); RowsetSharedPtr output_rowset(); @@ -65,10 +67,11 @@ public: RuntimeProfile* runtime_profile() const { return _profile.get(); } + virtual ReaderType compaction_type() const = 0; + virtual std::string compaction_name() const = 0; + protected: virtual Status pick_rowsets_to_compact() = 0; - virtual std::string compaction_name() const = 0; - virtual ReaderType compaction_type() const = 0; Status do_compaction(int64_t permits); Status do_compaction_impl(int64_t permits); diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 1c65df768d0..1f54c1f3285 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -61,7 +61,6 @@ Status CumulativeCompaction::prepare_compact() { // 2. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); COUNTER_UPDATE(_input_rowsets_counter, _input_rowsets.size()); - _tablet->set_clone_occurred(false); return Status::OK(); } @@ -73,13 +72,6 @@ Status CumulativeCompaction::execute_compact_impl() { "The tablet is under cumulative compaction. tablet={}", _tablet->tablet_id()); } - // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked - // for compaction may change. In this case, current compaction task should not be executed. - if (_tablet->get_clone_occurred()) { - _tablet->set_clone_occurred(false); - return Status::Error<CUMULATIVE_CLONE_OCCURRED, false>("get_clone_occurred failed"); - } - SCOPED_ATTACH_TASK(_mem_tracker); // 3. do cumulative compaction, merge rowsets diff --git a/be/src/olap/cumulative_compaction.h b/be/src/olap/cumulative_compaction.h index d74542a2ffe..7ea7fb383f1 100644 --- a/be/src/olap/cumulative_compaction.h +++ b/be/src/olap/cumulative_compaction.h @@ -39,8 +39,6 @@ public: Status prepare_compact() override; Status execute_compact_impl() override; - std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; } - protected: Status pick_rowsets_to_compact() override; diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 01bd5e3dc61..927b4a33198 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -51,29 +51,19 @@ Status FullCompaction::prepare_compact() { return Status::Error<INVALID_ARGUMENT, false>("Full compaction init failed"); } - std::unique_lock full_lock(_tablet->get_full_compaction_lock()); std::unique_lock base_lock(_tablet->get_base_compaction_lock()); std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock()); // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); - _tablet->set_clone_occurred(false); return Status::OK(); } Status FullCompaction::execute_compact_impl() { - std::unique_lock full_lock(_tablet->get_full_compaction_lock()); std::unique_lock base_lock(_tablet->get_base_compaction_lock()); std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock()); - // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked - // for compaction may change. In this case, current compaction task should not be executed. - if (_tablet->get_clone_occurred()) { - _tablet->set_clone_occurred(false); - return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred failed"); - } - SCOPED_ATTACH_TASK(_mem_tracker); // 2. do full compaction, merge rowsets diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h index bce9ac745b6..631d901e846 100644 --- a/be/src/olap/full_compaction.h +++ b/be/src/olap/full_compaction.h @@ -39,8 +39,6 @@ public: Status execute_compact_impl() override; Status modify_rowsets(const Merger::Statistics* stats = nullptr) override; - std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; } - protected: Status pick_rowsets_to_compact() override; std::string compaction_name() const override { return "full compaction"; } diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index f7caa7d8d02..7268d7959f9 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -57,6 +57,7 @@ #include "olap/olap_common.h" #include "olap/rowset/segcompaction.h" #include "olap/schema_change.h" +#include "olap/single_replica_compaction.h" #include "olap/storage_engine.h" #include "olap/tablet.h" #include "olap/tablet_manager.h" @@ -593,11 +594,6 @@ void StorageEngine::_compaction_tasks_producer_callback() { continue; } - /// Regardless of whether the tablet is submitted for compaction or not, - /// we need to call 'reset_compaction' to clean up the base_compaction or cumulative_compaction objects - /// in the tablet, because these two objects store the tablet's own shared_ptr. - /// If it is not cleaned up, the reference count of the tablet will always be greater than 1, - /// thus cannot be collected by the garbage collector. (TabletManager::start_trash_sweep) for (const auto& tablet : tablets_compaction) { if (compaction_type == CompactionType::BASE_COMPACTION) { tablet->set_last_base_compaction_schedule_time(UnixMillis()); @@ -717,33 +713,39 @@ Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tab return Status::AlreadyExist<false>( "compaction task has already been submitted, tablet_id={}", tablet->tablet_id()); } - Status st = tablet->prepare_single_replica_compaction(tablet, compaction_type); + + auto compaction = std::make_shared<SingleReplicaCompaction>(tablet, compaction_type); + auto st = compaction->prepare_compact(); + auto clean_single_replica_compaction = [tablet, this]() { - tablet->reset_single_replica_compaction(); _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); _pop_tablet_from_submitted_compaction(tablet, CompactionType::BASE_COMPACTION); }; - if (st.ok()) { - auto submit_st = _single_replica_compaction_thread_pool->submit_func( - [tablet, compaction_type, clean_single_replica_compaction]() { - tablet->execute_single_replica_compaction(compaction_type); - clean_single_replica_compaction(); - }); - if (!submit_st.ok()) { - clean_single_replica_compaction(); - return Status::InternalError( - "failed to submit single replica compaction task to thread pool, " - "tablet_id={} ", - tablet->tablet_id()); + if (!st.ok()) { + clean_single_replica_compaction(); + if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { + LOG(WARNING) << "failed to prepare single replica compaction, tablet_id=" + << tablet->tablet_id() << " : " << st; + return st; } - return Status::OK(); - } else { + return Status::OK(); // No suitable version, regard as OK + } + + auto submit_st = _single_replica_compaction_thread_pool->submit_func( + [tablet, compaction = std::move(compaction), + clean_single_replica_compaction]() mutable { + tablet->execute_single_replica_compaction(*compaction); + clean_single_replica_compaction(); + }); + if (!submit_st.ok()) { clean_single_replica_compaction(); return Status::InternalError( - "failed to prepare single replica compaction task tablet_id={} ", + "failed to submit single replica compaction task to thread pool, " + "tablet_id={}", tablet->tablet_id()); } + return Status::OK(); } void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request, @@ -917,8 +919,10 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, "compaction task has already been submitted, tablet_id={}, compaction_type={}.", tablet->tablet_id(), compaction_type); } + std::shared_ptr<Compaction> compaction; int64_t permits = 0; - Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits); + Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet, + compaction, permits); bool is_low_priority_task = [&]() { // Can add more strategies to determine whether a task is a low priority task in the future if (!config::enable_compaction_priority_scheduling) { @@ -938,27 +942,24 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, (compaction_type == CompactionType::CUMULATIVE_COMPACTION) ? _cumu_compaction_thread_pool : _base_compaction_thread_pool; - auto st = thread_pool->submit_func([tablet, compaction_type, permits, is_low_priority_task, + auto st = thread_pool->submit_func([tablet, compaction = std::move(compaction), + compaction_type, permits, is_low_priority_task, this]() { if (is_low_priority_task && !_increase_low_priority_task_nums(tablet->data_dir())) { VLOG_DEBUG << "skip low priority compaction task for tablet: " << tablet->tablet_id(); // Todo: push task back } else { - tablet->execute_compaction(compaction_type); + tablet->execute_compaction(*compaction); if (is_low_priority_task) { _decrease_low_priority_task_nums(tablet->data_dir()); } } _permit_limiter.release(permits); - // reset compaction - tablet->reset_compaction(compaction_type); _pop_tablet_from_submitted_compaction(tablet, compaction_type); }); if (!st.ok()) { _permit_limiter.release(permits); - // reset compaction - tablet->reset_compaction(compaction_type); _pop_tablet_from_submitted_compaction(tablet, compaction_type); return Status::InternalError( "failed to submit compaction task to thread pool, " @@ -967,8 +968,6 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, } return Status::OK(); } else { - // reset compaction - tablet->reset_compaction(compaction_type); _pop_tablet_from_submitted_compaction(tablet, compaction_type); if (!st.ok()) { return Status::InternalError( diff --git a/be/src/olap/single_replica_compaction.cpp b/be/src/olap/single_replica_compaction.cpp index a5e060147d6..2a8edb60471 100644 --- a/be/src/olap/single_replica_compaction.cpp +++ b/be/src/olap/single_replica_compaction.cpp @@ -70,7 +70,6 @@ Status SingleReplicaCompaction::prepare_compact() { // 1. pick rowsets to compact RETURN_IF_ERROR(pick_rowsets_to_compact()); - _tablet->set_clone_occurred(false); if (_input_rowsets.size() == 1) { return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>("_input_rowsets.size() is 1"); } @@ -105,13 +104,6 @@ Status SingleReplicaCompaction::execute_compact_impl() { "another base compaction is running. tablet={}", _tablet->tablet_id()); } - // Clone task may happen after compaction task is submitted to thread pool, and rowsets picked - // for compaction may change. In this case, current compaction task should not be executed. - if (_tablet->get_clone_occurred()) { - _tablet->set_clone_occurred(false); - return Status::Error<BE_CLONE_OCCURRED, false>("get_clone_occurred failed"); - } - SCOPED_ATTACH_TASK(_mem_tracker); // 2. do single replica compaction diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index fd865712c5f..b5090f0e280 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -426,10 +426,10 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet << ref_tablet->tablet_id(); Version version(request.start_version, request.end_version); const RowsetSharedPtr rowset = ref_tablet->get_rowset_by_version(version, false); - if (rowset != nullptr) { + if (rowset && rowset->is_local()) { consistent_rowsets.push_back(rowset); } else { - LOG(WARNING) << "failed to find version when do compaction snapshot. " + LOG(WARNING) << "failed to find local version when do compaction snapshot. " << " tablet=" << request.tablet_id << " schema_hash=" << request.schema_hash << " version=" << version; diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp index 4e0fb2eddf1..cf4687caf3e 100644 --- a/be/src/olap/storage_engine.cpp +++ b/be/src/olap/storage_engine.cpp @@ -1256,28 +1256,6 @@ PendingRowsetGuard StorageEngine::add_pending_rowset(const RowsetWriterContext& return _pending_remote_rowsets.add(ctx.rowset_id); } -void StorageEngine::create_cumulative_compaction( - TabletSharedPtr best_tablet, std::shared_ptr<CumulativeCompaction>& cumulative_compaction) { - cumulative_compaction.reset(new CumulativeCompaction(best_tablet)); -} - -void StorageEngine::create_base_compaction(TabletSharedPtr best_tablet, - std::shared_ptr<BaseCompaction>& base_compaction) { - base_compaction.reset(new BaseCompaction(best_tablet)); -} - -void StorageEngine::create_full_compaction(TabletSharedPtr best_tablet, - std::shared_ptr<FullCompaction>& full_compaction) { - full_compaction.reset(new FullCompaction(best_tablet)); -} - -void StorageEngine::create_single_replica_compaction( - TabletSharedPtr best_tablet, - std::shared_ptr<SingleReplicaCompaction>& single_replica_compaction, - CompactionType compaction_type) { - single_replica_compaction.reset(new SingleReplicaCompaction(best_tablet, compaction_type)); -} - bool StorageEngine::get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token) { TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 6b028b01734..77d3efeaaf8 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -180,18 +180,6 @@ public: void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request, PGetTabletVersionsResponse* response); - void create_cumulative_compaction(TabletSharedPtr best_tablet, - std::shared_ptr<CumulativeCompaction>& cumulative_compaction); - void create_base_compaction(TabletSharedPtr best_tablet, - std::shared_ptr<BaseCompaction>& base_compaction); - - void create_full_compaction(TabletSharedPtr best_tablet, - std::shared_ptr<FullCompaction>& full_compaction); - - void create_single_replica_compaction( - TabletSharedPtr best_tablet, - std::shared_ptr<SingleReplicaCompaction>& single_replica_compaction, - CompactionType compaction_type); bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token); bool should_fetch_from_peer(int64_t tablet_id); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 5f915ad1de1..8cfce2d35ec 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -69,6 +69,7 @@ #include "io/fs/file_writer.h" #include "io/fs/path.h" #include "io/fs/remote_file_system.h" +#include "io/io_common.h" #include "olap/base_compaction.h" #include "olap/base_tablet.h" #include "olap/binlog.h" @@ -143,21 +144,39 @@ using std::string; using std::vector; using io::FileSystemSPtr; -static bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk", "tablet_lookup_rowkey"); -static bvar::LatencyRecorder g_tablet_commit_phase_update_delete_bitmap_latency( +namespace { + +bvar::LatencyRecorder g_tablet_lookup_rowkey_latency("doris_pk", "tablet_lookup_rowkey"); +bvar::LatencyRecorder g_tablet_commit_phase_update_delete_bitmap_latency( "doris_pk", "commit_phase_update_delete_bitmap"); -static bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", - "update_delete_bitmap"); -static bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found"); -static bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second( +bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", "update_delete_bitmap"); +bvar::Adder<uint64_t> g_tablet_pk_not_found("doris_pk", "lookup_not_found"); +bvar::PerSecond<bvar::Adder<uint64_t>> g_tablet_pk_not_found_per_second( "doris_pk", "lookup_not_found_per_second", &g_tablet_pk_not_found, 60); -const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD = 1s; - bvar::Adder<uint64_t> exceed_version_limit_counter; bvar::Window<bvar::Adder<uint64_t>> exceed_version_limit_counter_minute( &exceed_version_limit_counter, 60); +void set_last_failure_time(Tablet* tablet, const Compaction& compaction, int64_t ms) { + switch (compaction.compaction_type()) { + case ReaderType::READER_CUMULATIVE_COMPACTION: + tablet->set_last_cumu_compaction_failure_time(ms); + return; + case ReaderType::READER_BASE_COMPACTION: + tablet->set_last_base_compaction_failure_time(ms); + return; + case ReaderType::READER_FULL_COMPACTION: + tablet->set_last_full_compaction_failure_time(ms); + return; + default: + LOG(FATAL) << "invalid compaction type " << compaction.compaction_name() + << " tablet_id: " << tablet->tablet_id(); + } +}; + +} // namespace + struct WriteCooldownMetaExecutors { WriteCooldownMetaExecutors(size_t executor_nums = 5); @@ -251,7 +270,6 @@ Tablet::Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir, _newly_created_rowset_num(0), _last_checkpoint_time(0), _cumulative_compaction_type(cumulative_compaction_type), - _is_clone_occurred(false), _is_tablet_path_exists(true), _last_missed_version(-1), _last_missed_time_s(0) { @@ -475,6 +493,22 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add, return Status::OK(); } + if (check_delete) { + for (auto&& rs : to_delete) { + if (auto it = _rs_version_map.find(rs->version()); it == _rs_version_map.end()) { + return Status::Error<DELETE_VERSION_ERROR>( + "try to delete not exist version {} from {}", rs->version().to_string(), + tablet_id()); + } else if (rs->rowset_id() != it->second->rowset_id()) { + return Status::Error<DELETE_VERSION_ERROR>( + "try to delete version {} from {}, but rowset id changed, delete rowset id " + "is {}, exists rowsetid is {}", + rs->version().to_string(), tablet_id(), rs->rowset_id().to_string(), + it->second->rowset_id().to_string()); + } + } + } + bool same_version = true; std::sort(to_add.begin(), to_add.end(), Rowset::comparator); std::sort(to_delete.begin(), to_delete.end(), Rowset::comparator); @@ -489,23 +523,6 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add, same_version = false; } - if (check_delete) { - for (auto& rs : to_delete) { - auto find_rs = _rs_version_map.find(rs->version()); - if (find_rs == _rs_version_map.end()) { - return Status::Error<DELETE_VERSION_ERROR>( - "try to delete not exist version {} from {}", rs->version().to_string(), - tablet_id()); - } else if (find_rs->second->rowset_id() != rs->rowset_id()) { - return Status::Error<DELETE_VERSION_ERROR>( - "try to delete version {} from {}, but rowset id changed, delete rowset id " - "is {}, exists rowsetid is {}", - rs->version().to_string(), tablet_id(), rs->rowset_id().to_string(), - find_rs->second->rowset_id().to_string()); - } - } - } - std::vector<RowsetMetaSharedPtr> rs_metas_to_delete; for (auto& rs : to_delete) { rs_metas_to_delete.push_back(rs->rowset_meta()); @@ -1755,138 +1772,100 @@ void Tablet::generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_m } Status Tablet::prepare_compaction_and_calculate_permits(CompactionType compaction_type, - TabletSharedPtr tablet, int64_t* permits) { - std::vector<RowsetSharedPtr> compaction_rowsets; + const TabletSharedPtr& tablet, + std::shared_ptr<Compaction>& compaction, + int64_t& permits) { if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { MonotonicStopWatch watch; watch.start(); - SCOPED_CLEANUP({ - if (!config::disable_compaction_trace_log && - watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) { - std::stringstream ss; - _cumulative_compaction->runtime_profile()->pretty_print(&ss); - LOG(WARNING) << "prepare cumulative compaction cost " << watch.elapsed_time() / 1e9 - << std::endl - << ss.str(); - } - }); - StorageEngine::instance()->create_cumulative_compaction(tablet, _cumulative_compaction); + compaction = std::make_shared<CumulativeCompaction>(tablet); DorisMetrics::instance()->cumulative_compaction_request_total->increment(1); - Status res = _cumulative_compaction->prepare_compact(); + Status res = compaction->prepare_compact(); + if (!config::disable_compaction_trace_log && + watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) { + std::stringstream ss; + compaction->runtime_profile()->pretty_print(&ss); + LOG(WARNING) << "prepare cumulative compaction cost " << watch.elapsed_time() / 1e9 + << std::endl + << ss.str(); + } + if (!res.ok()) { - set_last_cumu_compaction_failure_time(UnixMillis()); - *permits = 0; + tablet->set_last_cumu_compaction_failure_time(UnixMillis()); + permits = 0; if (!res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) { DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); - return Status::InternalError("prepare cumulative compaction with err: {}", - res.to_string()); + return Status::InternalError("prepare cumulative compaction with err: {}", res); } // return OK if OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION, so that we don't need to // print too much useless logs. // And because we set permits to 0, so even if we return OK here, nothing will be done. return Status::OK(); } - compaction_rowsets = _cumulative_compaction->get_input_rowsets(); } else if (compaction_type == CompactionType::BASE_COMPACTION) { - DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); MonotonicStopWatch watch; watch.start(); - SCOPED_CLEANUP({ - if (!config::disable_compaction_trace_log && - watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) { - std::stringstream ss; - _base_compaction->runtime_profile()->pretty_print(&ss); - LOG(WARNING) << "prepare base compaction cost " << watch.elapsed_time() / 1e9 - << std::endl - << ss.str(); - } - }); - StorageEngine::instance()->create_base_compaction(tablet, _base_compaction); + compaction = std::make_shared<BaseCompaction>(tablet); DorisMetrics::instance()->base_compaction_request_total->increment(1); - Status res = _base_compaction->prepare_compact(); - set_last_base_compaction_status(res.to_string()); + Status res = compaction->prepare_compact(); + if (!config::disable_compaction_trace_log && + watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) { + std::stringstream ss; + compaction->runtime_profile()->pretty_print(&ss); + LOG(WARNING) << "prepare base compaction cost " << watch.elapsed_time() / 1e9 + << std::endl + << ss.str(); + } + + tablet->set_last_base_compaction_status(res.to_string()); if (!res.ok()) { - set_last_base_compaction_failure_time(UnixMillis()); - *permits = 0; + tablet->set_last_base_compaction_failure_time(UnixMillis()); + permits = 0; if (!res.is<BE_NO_SUITABLE_VERSION>()) { DorisMetrics::instance()->base_compaction_request_failed->increment(1); - return Status::InternalError("prepare base compaction with err: {}", - res.to_string()); + return Status::InternalError("prepare base compaction with err: {}", res); } // return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't need to // print too much useless logs. // And because we set permits to 0, so even if we return OK here, nothing will be done. return Status::OK(); } - compaction_rowsets = _base_compaction->get_input_rowsets(); } else { DCHECK_EQ(compaction_type, CompactionType::FULL_COMPACTION); - MonotonicStopWatch watch; - watch.start(); - StorageEngine::instance()->create_full_compaction(tablet, _full_compaction); - Status res = _full_compaction->prepare_compact(); + + compaction = std::make_shared<FullCompaction>(tablet); + Status res = compaction->prepare_compact(); if (!res.ok()) { - set_last_full_compaction_failure_time(UnixMillis()); - *permits = 0; + tablet->set_last_full_compaction_failure_time(UnixMillis()); + permits = 0; if (!res.is<FULL_NO_SUITABLE_VERSION>()) { - return Status::InternalError("prepare full compaction with err: {}", - res.to_string()); + return Status::InternalError("prepare full compaction with err: {}", res); } // return OK if OLAP_ERR_BE_NO_SUITABLE_VERSION, so that we don't need to // print too much useless logs. // And because we set permits to 0, so even if we return OK here, nothing will be done. return Status::OK(); } - compaction_rowsets = _full_compaction->get_input_rowsets(); - } - *permits = 0; - for (auto rowset : compaction_rowsets) { - *permits += rowset->rowset_meta()->get_compaction_score(); } - return Status::OK(); -} -Status Tablet::prepare_single_replica_compaction(TabletSharedPtr tablet, - CompactionType compaction_type) { - StorageEngine::instance()->create_single_replica_compaction(tablet, _single_replica_compaction, - compaction_type); - Status res = _single_replica_compaction->prepare_compact(); - if (!res.ok()) { - if (!res.is<CUMULATIVE_NO_SUITABLE_VERSION>()) { - return Status::InternalError("prepare single replica compaction with err: {}", - res.to_string()); - } + permits = 0; + for (auto&& rowset : compaction->input_rowsets()) { + permits += rowset->rowset_meta()->get_compaction_score(); } return Status::OK(); } -void Tablet::execute_single_replica_compaction(CompactionType compaction_type) { - Status res = _single_replica_compaction->execute_compact(); +void Tablet::execute_single_replica_compaction(SingleReplicaCompaction& compaction) { + Status res = compaction.execute_compact(); if (!res.ok()) { - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - set_last_cumu_compaction_failure_time(UnixMillis()); - } else if (compaction_type == CompactionType::BASE_COMPACTION) { - set_last_base_compaction_failure_time(UnixMillis()); - } else if (compaction_type == CompactionType::FULL_COMPACTION) { - set_last_full_compaction_failure_time(UnixMillis()); - } + set_last_failure_time(this, compaction, UnixMillis()); LOG(WARNING) << "failed to do single replica compaction. res=" << res << ", tablet=" << tablet_id(); return; } - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - set_last_cumu_compaction_failure_time(0); - } else if (compaction_type == CompactionType::BASE_COMPACTION) { - set_last_base_compaction_failure_time(0); - } else if (compaction_type == CompactionType::FULL_COMPACTION) { - set_last_full_compaction_failure_time(0); - } -} - -void Tablet::reset_single_replica_compaction() { - _single_replica_compaction.reset(); + set_last_failure_time(this, compaction, 0); } std::vector<Version> Tablet::get_all_versions() { @@ -1903,78 +1882,38 @@ std::vector<Version> Tablet::get_all_versions() { return local_versions; } -void Tablet::execute_compaction(CompactionType compaction_type) { +void Tablet::execute_compaction(Compaction& compaction) { signal::tablet_id = tablet_id(); - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - MonotonicStopWatch watch; - watch.start(); - SCOPED_CLEANUP({ - if (!config::disable_compaction_trace_log && - watch.elapsed_time() / 1e9 > config::cumulative_compaction_trace_threshold) { - std::stringstream ss; - _cumulative_compaction->runtime_profile()->pretty_print(&ss); - LOG(WARNING) << "execute cumulative compaction cost " << watch.elapsed_time() / 1e9 - << std::endl - << ss.str(); - } - }); - Status res = _cumulative_compaction->execute_compact(); - if (!res.ok()) { - set_last_cumu_compaction_failure_time(UnixMillis()); - DorisMetrics::instance()->cumulative_compaction_request_failed->increment(1); - LOG(WARNING) << "failed to do cumulative compaction. res=" << res - << ", tablet=" << tablet_id(); - return; - } - set_last_cumu_compaction_failure_time(0); - } else if (compaction_type == CompactionType::BASE_COMPACTION) { - DCHECK_EQ(compaction_type, CompactionType::BASE_COMPACTION); - MonotonicStopWatch watch; - watch.start(); - SCOPED_CLEANUP({ - if (!config::disable_compaction_trace_log && - watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold) { - std::stringstream ss; - _base_compaction->runtime_profile()->pretty_print(&ss); - LOG(WARNING) << "execute base compaction cost " << watch.elapsed_time() / 1e9 - << std::endl - << ss.str(); - } - }); + MonotonicStopWatch watch; + watch.start(); - Status res = _base_compaction->execute_compact(); - set_last_base_compaction_status(res.to_string()); - if (!res.ok()) { - set_last_base_compaction_failure_time(UnixMillis()); - DorisMetrics::instance()->base_compaction_request_failed->increment(1); - LOG(WARNING) << "failed to do base compaction. res=" << res - << ", tablet=" << tablet_id(); - return; - } - set_last_base_compaction_failure_time(0); + Status res = compaction.execute_compact(); + + if (!res.ok()) [[unlikely]] { + set_last_failure_time(this, compaction, UnixMillis()); + LOG(WARNING) << "failed to do " << compaction.compaction_name() + << ", tablet=" << tablet_id() << " : " << res; } else { - DCHECK_EQ(compaction_type, CompactionType::FULL_COMPACTION); - MonotonicStopWatch watch; - watch.start(); - Status res = _full_compaction->execute_compact(); - if (!res.ok()) { - set_last_full_compaction_failure_time(UnixMillis()); - LOG(WARNING) << "failed to do full compaction. res=" << res - << ", tablet=" << tablet_id(); - return; - } - set_last_full_compaction_failure_time(0); + set_last_failure_time(this, compaction, 0); } -} -void Tablet::reset_compaction(CompactionType compaction_type) { - if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { - _cumulative_compaction.reset(); - } else if (compaction_type == CompactionType::BASE_COMPACTION) { - _base_compaction.reset(); - } else { - _full_compaction.reset(); + if (!config::disable_compaction_trace_log) { + auto need_trace = [&compaction, &watch] { + return compaction.compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION + ? watch.elapsed_time() / 1e9 > + config::cumulative_compaction_trace_threshold + : compaction.compaction_type() == ReaderType::READER_BASE_COMPACTION + ? watch.elapsed_time() / 1e9 > config::base_compaction_trace_threshold + : false; + }; + if (need_trace()) { + std::stringstream ss; + compaction.runtime_profile()->pretty_print(&ss); + LOG(WARNING) << "execute " << compaction.compaction_name() << " cost " + << watch.elapsed_time() / 1e9 << std::endl + << ss.str(); + } } } diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 4174a0ec26d..614288921ed 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -59,9 +59,7 @@ namespace doris { class Tablet; class CumulativeCompactionPolicy; -class CumulativeCompaction; -class BaseCompaction; -class FullCompaction; +class Compaction; class SingleReplicaCompaction; class RowsetWriter; struct RowsetWriterContext; @@ -85,7 +83,7 @@ enum SortType : int; enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL }; -extern const std::chrono::seconds TRACE_TABLET_LOCK_THRESHOLD; +static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = std::chrono::seconds(1); class Tablet final : public BaseTablet { public: @@ -146,6 +144,8 @@ public: // operation in rowsets Status add_rowset(RowsetSharedPtr rowset); Status create_initial_rowset(const int64_t version); + + // MUST hold EXCLUSIVE `_meta_lock`. Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add, std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false); @@ -194,7 +194,6 @@ public: std::mutex& get_push_lock() { return _ingest_lock; } std::mutex& get_base_compaction_lock() { return _base_compaction_lock; } std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; } - std::mutex& get_full_compaction_lock() { return _full_compaction_lock; } std::shared_mutex& get_migration_lock() { return _migration_lock; } @@ -296,18 +295,13 @@ public: // return a json string to show the compaction status of this tablet void get_compaction_status(std::string* json_result); - Status prepare_compaction_and_calculate_permits(CompactionType compaction_type, - TabletSharedPtr tablet, int64_t* permits); - - Status prepare_single_replica_compaction(TabletSharedPtr tablet, - CompactionType compaction_type); - void execute_compaction(CompactionType compaction_type); - void reset_compaction(CompactionType compaction_type); - void execute_single_replica_compaction(CompactionType compaction_type); - void reset_single_replica_compaction(); + static Status prepare_compaction_and_calculate_permits(CompactionType compaction_type, + const TabletSharedPtr& tablet, + std::shared_ptr<Compaction>& compaction, + int64_t& permits); - void set_clone_occurred(bool clone_occurred) { _is_clone_occurred = clone_occurred; } - bool get_clone_occurred() { return _is_clone_occurred; } + void execute_compaction(Compaction& compaction); + void execute_single_replica_compaction(SingleReplicaCompaction& compaction); void set_cumulative_compaction_policy( std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) { @@ -319,7 +313,7 @@ public: } void set_last_base_compaction_status(std::string status) { - _last_base_compaction_status = status; + _last_base_compaction_status = std::move(status); } std::string get_last_base_compaction_status() { return _last_base_compaction_status; } @@ -622,7 +616,6 @@ private: std::mutex _ingest_lock; std::mutex _base_compaction_lock; std::mutex _cumulative_compaction_lock; - std::mutex _full_compaction_lock; std::mutex _schema_change_lock; std::shared_mutex _migration_lock; std::mutex _build_inverted_index_lock; @@ -666,14 +659,6 @@ private: std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy; std::string_view _cumulative_compaction_type; - std::shared_ptr<CumulativeCompaction> _cumulative_compaction; - std::shared_ptr<BaseCompaction> _base_compaction; - std::shared_ptr<FullCompaction> _full_compaction; - std::shared_ptr<SingleReplicaCompaction> _single_replica_compaction; - - // whether clone task occurred during the tablet is in thread pool queue to wait for compaction - std::atomic<bool> _is_clone_occurred; - // use a seperate thread to check all tablets paths existance std::atomic<bool> _is_tablet_path_exists; diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp index 068e8723354..e27db6188bd 100644 --- a/be/src/olap/task/engine_clone_task.cpp +++ b/be/src/olap/task/engine_clone_task.cpp @@ -702,7 +702,6 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d std::lock_guard cumulative_compaction_lock(tablet->get_cumulative_compaction_lock()); std::lock_guard cold_compaction_lock(tablet->get_cold_compaction_lock()); std::lock_guard build_inverted_index_lock(tablet->get_build_inverted_index_lock()); - tablet->set_clone_occurred(true); std::lock_guard<std::mutex> push_lock(tablet->get_push_lock()); std::lock_guard<std::mutex> rwlock(tablet->get_rowset_update_lock()); std::lock_guard<std::shared_mutex> wrlock(tablet->get_header_lock()); diff --git a/be/src/olap/task/engine_storage_migration_task.cpp b/be/src/olap/task/engine_storage_migration_task.cpp index 8fba16c67df..9e8fc39abde 100644 --- a/be/src/olap/task/engine_storage_migration_task.cpp +++ b/be/src/olap/task/engine_storage_migration_task.cpp @@ -201,12 +201,10 @@ Status EngineStorageMigrationTask::_migrate() { // compaction will be prohibited for the mow table when migration. Moreover, it is useless // to perform a compaction operation on the migration data, as the migration still migrates // the data of rowsets before the compaction operation. - std::unique_lock full_compaction_lock(_tablet->get_full_compaction_lock(), std::defer_lock); std::unique_lock base_compaction_lock(_tablet->get_base_compaction_lock(), std::defer_lock); std::unique_lock cumu_compaction_lock(_tablet->get_cumulative_compaction_lock(), std::defer_lock); if (_tablet->enable_unique_key_merge_on_write()) { - full_compaction_lock.lock(); base_compaction_lock.lock(); cumu_compaction_lock.lock(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org