This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bd4048f8fb [enhancement](compaction) add idle schedule and max_size 
limit for base compaction (#11542)
bd4048f8fb is described below

commit bd4048f8fbbfe4cc153101eb7fe57dc3405581a2
Author: yixiutt <102007456+yixi...@users.noreply.github.com>
AuthorDate: Sun Aug 7 16:21:57 2022 +0800

    [enhancement](compaction) add idle schedule and max_size limit for base 
compaction (#11542)
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/common/config.h          |  5 +++++
 be/src/olap/base_compaction.cpp | 30 ++++++++++++++++++++++++++----
 be/src/olap/base_compaction.h   |  4 ++++
 be/src/util/thread.cpp          | 17 +++++++++++++++++
 be/src/util/thread.h            |  2 ++
 5 files changed, 54 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5bafe0899c..dbeebf41bc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -262,6 +262,11 @@ CONF_mInt64(base_compaction_num_cumulative_deltas, "5");
 CONF_mDouble(base_cumulative_delta_ratio, "0.3");
 CONF_mInt64(base_compaction_interval_seconds_since_last_operation, "86400");
 CONF_mInt32(base_compaction_write_mbytes_per_sec, "5");
+CONF_Bool(enable_base_compaction_idle_sched, "true");
+
+// dup key not compaction big files
+CONF_Bool(enable_dup_key_base_compaction_skip_big_file, "true");
+CONF_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
 
 // config the cumulative compaction policy
 // Valid configs: num_based, size_based
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 2e8d1f82aa..22485c2b7a 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -49,6 +49,9 @@ Status BaseCompaction::prepare_compact() {
 }
 
 Status BaseCompaction::execute_compact_impl() {
+    if (config::enable_base_compaction_idle_sched) {
+        Thread::set_idle_sched();
+    }
     std::unique_lock<std::mutex> lock(_tablet->get_base_compaction_lock(), 
std::try_to_lock);
     if (!lock.owns_lock()) {
         LOG(WARNING) << "another base compaction is running. tablet=" << 
_tablet->full_name();
@@ -81,16 +84,35 @@ Status BaseCompaction::execute_compact_impl() {
     return Status::OK();
 }
 
+void BaseCompaction::_filter_input_rowset() {
+    // if enable dup key skip big file and no delete predicate
+    // we skip big files too save resources
+    if (!config::enable_dup_key_base_compaction_skip_big_file ||
+        _tablet->keys_type() != KeysType::DUP_KEYS || 
_tablet->delete_predicates().size() != 0) {
+        return;
+    }
+    int64_t max_size = config::base_compaction_dup_key_max_file_size_mbytes * 
1024 * 1024;
+    // first find a proper rowset for start
+    auto rs_iter = _input_rowsets.begin();
+    while (rs_iter != _input_rowsets.end()) {
+        if ((*rs_iter)->rowset_meta()->total_disk_size() >= max_size) {
+            rs_iter = _input_rowsets.erase(rs_iter);
+        } else {
+            break;
+        }
+    }
+}
+
 Status BaseCompaction::pick_rowsets_to_compact() {
     _input_rowsets.clear();
     _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets);
-    if (_input_rowsets.size() <= 1) {
-        return Status::OLAPInternalError(OLAP_ERR_BE_NO_SUITABLE_VERSION);
-    }
-
     std::sort(_input_rowsets.begin(), _input_rowsets.end(), 
Rowset::comparator);
     RETURN_NOT_OK(check_version_continuity(_input_rowsets));
     RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
+    _filter_input_rowset();
+    if (_input_rowsets.size() <= 1) {
+        return Status::OLAPInternalError(OLAP_ERR_BE_NO_SUITABLE_VERSION);
+    }
 
     // If there are delete predicate rowsets in tablet, start_version > 0 
implies some rowsets before
     // delete version cannot apply these delete predicates, which can cause 
incorrect query result.
diff --git a/be/src/olap/base_compaction.h b/be/src/olap/base_compaction.h
index 21a6e24bc3..96a4a362f4 100644
--- a/be/src/olap/base_compaction.h
+++ b/be/src/olap/base_compaction.h
@@ -47,6 +47,10 @@ private:
     // a rowset with overlapping segments should be compacted by cumulative 
compaction first.
     Status _check_rowset_overlapping(const vector<RowsetSharedPtr>& rowsets);
 
+    // filter input rowset in some case:
+    // 1. dup key without delete predicate
+    void _filter_input_rowset();
+
     DISALLOW_COPY_AND_ASSIGN(BaseCompaction);
 };
 
diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp
index 39060d645b..db8af4bb42 100644
--- a/be/src/util/thread.cpp
+++ b/be/src/util/thread.cpp
@@ -73,6 +73,8 @@ public:
 
     static void set_thread_name(const std::string& name, int64_t tid);
 
+    static void set_idle_sched(int64_t tid);
+
     // not the system TID, since pthread_t is less prone to being recycled.
     void add_thread(const pthread_t& pthread_id, const std::string& name,
                     const std::string& category, int64_t tid);
@@ -137,6 +139,17 @@ void ThreadMgr::set_thread_name(const std::string& name, 
int64_t tid) {
     }
 }
 
+void ThreadMgr::set_idle_sched(int64_t tid) {
+    if (tid == getpid()) {
+        return;
+    }
+    struct sched_param sp = {.sched_priority = 0};
+    int err = sched_setscheduler(0, SCHED_IDLE, &sp);
+    if (err < 0 && errno != EPERM) {
+        LOG(ERROR) << "set_thread_idle_sched";
+    }
+}
+
 void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& 
name,
                            const std::string& category, int64_t tid) {
     // These annotations cause TSAN to ignore the synchronization on lock_
@@ -262,6 +275,10 @@ void Thread::set_self_name(const std::string& name) {
     ThreadMgr::set_thread_name(name, current_thread_id());
 }
 
+void Thread::set_idle_sched() {
+    ThreadMgr::set_idle_sched(current_thread_id());
+}
+
 void Thread::join() {
     ThreadJoiner(this).join();
 }
diff --git a/be/src/util/thread.h b/be/src/util/thread.h
index 93ecf7faa0..766f43fc27 100644
--- a/be/src/util/thread.h
+++ b/be/src/util/thread.h
@@ -89,6 +89,8 @@ public:
 
     static void set_self_name(const std::string& name);
 
+    static void set_idle_sched();
+
     ~Thread();
 
     // Blocks until this thread finishes execution. Once this method returns, 
the thread


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to