This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f631c388d [enhance](cooldown)accelerate cooldown task produce 
efficiency (#16089)
1f631c388d is described below

commit 1f631c388ddc6ecbddbf10de50f61d52d69c2932
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Fri Feb 10 16:58:27 2023 +0800

    [enhance](cooldown)accelerate cooldown task produce efficiency (#16089)
---
 .../exec/schema_scanner/schema_rowsets_scanner.cpp |  9 ---
 be/src/http/action/pad_rowset_action.cpp           |  1 -
 be/src/olap/compaction.cpp                         |  2 -
 be/src/olap/compaction.h                           |  1 -
 be/src/olap/delta_writer.cpp                       |  1 -
 be/src/olap/olap_server.cpp                        | 64 +++++++++-------------
 be/src/olap/push_handler.cpp                       |  1 -
 be/src/olap/rowset/beta_rowset_reader.h            |  1 -
 be/src/olap/rowset/beta_rowset_writer.cpp          |  9 ---
 be/src/olap/rowset/rowset.h                        |  2 +-
 be/src/olap/rowset/rowset_meta.h                   |  6 --
 be/src/olap/rowset/rowset_reader.h                 |  1 -
 be/src/olap/rowset/rowset_writer_context.h         |  1 -
 be/src/olap/schema_change.cpp                      | 10 +---
 be/src/olap/schema_change.h                        |  7 +--
 be/src/olap/snapshot_manager.cpp                   |  1 -
 be/src/olap/storage_engine.h                       |  3 +-
 be/src/olap/tablet.cpp                             | 20 +------
 be/src/olap/tablet_manager.cpp                     | 56 ++++++++++---------
 be/src/olap/tablet_manager.h                       |  3 +-
 be/src/util/threadpool.cpp                         |  6 --
 be/src/util/threadpool.h                           | 18 +++++-
 be/test/olap/tablet_test.cpp                       | 26 +++++----
 docs/en/docs/admin-manual/system-table/rowsets.md  |  1 -
 .../docs/admin-manual/system-table/rowsets.md      |  1 -
 .../java/org/apache/doris/catalog/SchemaTable.java |  1 -
 gensrc/proto/olap_file.proto                       |  4 +-
 27 files changed, 101 insertions(+), 155 deletions(-)

diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp 
b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
index 2744531bd7..5ee608acbb 100644
--- a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
+++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
@@ -44,7 +44,6 @@ std::vector<SchemaScanner::ColumnDesc> 
SchemaRowsetsScanner::_s_tbls_columns = {
         {"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
         {"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true},
         {"CREATION_TIME", TYPE_BIGINT, sizeof(int64_t), true},
-        {"OLDEST_WRITE_TIMESTAMP", TYPE_BIGINT, sizeof(int64_t), true},
         {"NEWEST_WRITE_TIMESTAMP", TYPE_BIGINT, sizeof(int64_t), true},
 
 };
@@ -189,14 +188,6 @@ Status 
SchemaRowsetsScanner::_fill_block_impl(vectorized::Block* block) {
             fill_dest_column(block, &src, _s_tbls_columns[10]);
         }
     }
-    // OLDEST_WRITE_TIMESTAMP
-    {
-        for (int i = fill_idx_begin; i < fill_idx_end; ++i) {
-            RowsetSharedPtr rowset = rowsets_[i];
-            size_t src = rowset->oldest_write_timestamp();
-            fill_dest_column(block, &src, _s_tbls_columns[11]);
-        }
-    }
     // NEWEST_WRITE_TIMESTAMP
     {
         for (int i = fill_idx_begin; i < fill_idx_end; ++i) {
diff --git a/be/src/http/action/pad_rowset_action.cpp 
b/be/src/http/action/pad_rowset_action.cpp
index 225fec4dd2..c353b4fad7 100644
--- a/be/src/http/action/pad_rowset_action.cpp
+++ b/be/src/http/action/pad_rowset_action.cpp
@@ -91,7 +91,6 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, 
const Version& versi
     ctx.rowset_state = VISIBLE;
     ctx.segments_overlap = NONOVERLAPPING;
     ctx.tablet_schema = tablet->tablet_schema();
-    ctx.oldest_write_timestamp = UnixSeconds();
     ctx.newest_write_timestamp = UnixSeconds();
     RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer));
     auto rowset = writer->build();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index e15d263ded..f968f219fe 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -184,7 +184,6 @@ void Compaction::build_basic_info() {
     _output_version =
             Version(_input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version());
 
-    _oldest_write_timestamp = _input_rowsets.front()->oldest_write_timestamp();
     _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp();
 
     std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
@@ -364,7 +363,6 @@ Status Compaction::construct_output_rowset_writer(bool 
is_vertical) {
     ctx.rowset_state = VISIBLE;
     ctx.segments_overlap = NONOVERLAPPING;
     ctx.tablet_schema = _cur_tablet_schema;
-    ctx.oldest_write_timestamp = _oldest_write_timestamp;
     ctx.newest_write_timestamp = _newest_write_timestamp;
     if (is_vertical) {
         return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer);
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 11ea2e1ec1..8501df9ef4 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -105,7 +105,6 @@ protected:
 
     Version _output_version;
 
-    int64_t _oldest_write_timestamp;
     int64_t _newest_write_timestamp;
     RowIdConversion _rowid_conversion;
     TabletSchemaSPtr _cur_tablet_schema;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ee0b89570a..678d5006a0 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -137,7 +137,6 @@ Status DeltaWriter::init() {
     context.rowset_state = PREPARED;
     context.segments_overlap = OVERLAPPING;
     context.tablet_schema = _tablet_schema;
-    context.oldest_write_timestamp = UnixSeconds();
     context.newest_write_timestamp = UnixSeconds();
     context.tablet_id = _tablet->table_id();
     context.is_direct_write = true;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 9764680ff9..2bfffbbf29 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -692,60 +692,46 @@ Status 
StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer,
 void StorageEngine::_cooldown_tasks_producer_callback() {
     int64_t interval = config::generate_cooldown_task_interval_sec;
     do {
-        if (_cooldown_thread_pool->get_queue_size() > 0) {
-            continue;
-        }
+        // these tables are ordered by priority desc
         std::vector<TabletSharedPtr> tablets;
         // TODO(luwei) : a more efficient way to get cooldown tablets
-        _tablet_manager->get_cooldown_tablets(&tablets);
+        // we should skip all the tablets which are not running and those 
pending to do cooldown
+        auto skip_tablet = [this](const TabletSharedPtr& tablet) -> bool {
+            std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
+            return TABLET_RUNNING != tablet->tablet_state() ||
+                   _running_cooldown_tablets.find(tablet->tablet_id()) ==
+                           _running_cooldown_tablets.end();
+        };
+        _tablet_manager->get_cooldown_tablets(&tablets, 
std::move(skip_tablet));
         LOG(INFO) << "cooldown producer get tablet num: " << tablets.size();
+        int max_priority = tablets.size();
         for (const auto& tablet : tablets) {
-            Status st = _cooldown_thread_pool->submit_func([tablet, tablets, 
this]() {
+            {
+                std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
+                _running_cooldown_tablets.insert(tablet->tablet_id());
+            }
+            PriorityThreadPool::Task task;
+            task.work_function = [tablet, task_size = tablets.size(), this]() {
+                Status st = tablet->cooldown();
                 {
-                    // Cooldown tasks on the same tablet cannot be executed 
concurrently
                     std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
-                    auto it = 
_running_cooldown_tablets.find(tablet->tablet_id());
-                    if (it != _running_cooldown_tablets.end()) {
-                        return;
-                    }
-
-                    // the number of concurrent cooldown tasks in each 
directory
-                    // cannot exceed the configured value
-                    auto dir_it = 
_running_cooldown_tasks_cnt.find(tablet->data_dir());
-                    if (dir_it != _running_cooldown_tasks_cnt.end() &&
-                        dir_it->second >= config::concurrency_per_dir) {
-                        return;
-                    }
-
-                    _running_cooldown_tablets.insert(tablet->tablet_id());
-                    dir_it = 
_running_cooldown_tasks_cnt.find(tablet->data_dir());
-                    if (dir_it != _running_cooldown_tasks_cnt.end()) {
-                        _running_cooldown_tasks_cnt[tablet->data_dir()]++;
-                    } else {
-                        _running_cooldown_tasks_cnt[tablet->data_dir()] = 1;
-                    }
+                    _running_cooldown_tablets.erase(tablet->tablet_id());
                 }
-
-                Status st = tablet->cooldown();
                 if (!st.ok()) {
                     LOG(WARNING) << "failed to cooldown, tablet: " << 
tablet->tablet_id()
                                  << " err: " << st;
                 } else {
                     LOG(INFO) << "succeed to cooldown, tablet: " << 
tablet->tablet_id()
                               << " cooldown progress ("
-                              << tablets.size() - 
_cooldown_thread_pool->get_queue_size() << "/"
-                              << tablets.size() << ")";
+                              << task_size - 
_cooldown_thread_pool->get_queue_size() << "/"
+                              << task_size << ")";
                 }
+            };
+            task.priority = max_priority--;
+            bool submited = _cooldown_thread_pool->offer(std::move(task));
 
-                {
-                    std::lock_guard<std::mutex> lock(_running_cooldown_mutex);
-                    _running_cooldown_tasks_cnt[tablet->data_dir()]--;
-                    _running_cooldown_tablets.erase(tablet->tablet_id());
-                }
-            });
-
-            if (!st.ok()) {
-                LOG(INFO) << "failed to submit cooldown task, err msg: " << st;
+            if (submited) {
+                LOG(INFO) << "failed to submit cooldown task";
             }
         }
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 56c202a300..6ed44ea253 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -197,7 +197,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, 
RowsetSharedPtr* cur
         context.rowset_state = PREPARED;
         context.segments_overlap = OVERLAP_UNKNOWN;
         context.tablet_schema = tablet_schema;
-        context.oldest_write_timestamp = UnixSeconds();
         context.newest_write_timestamp = UnixSeconds();
         res = cur_tablet->create_rowset_writer(context, &rowset_writer);
         if (!res.ok()) {
diff --git a/be/src/olap/rowset/beta_rowset_reader.h 
b/be/src/olap/rowset/beta_rowset_reader.h
index 53bc73c156..e5223e5241 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -45,7 +45,6 @@ public:
 
     Version version() override { return _rowset->version(); }
 
-    int64_t oldest_write_timestamp() override { return 
_rowset->oldest_write_timestamp(); }
     int64_t newest_write_timestamp() override { return 
_rowset->newest_write_timestamp(); }
 
     RowsetSharedPtr rowset() override { return 
std::dynamic_pointer_cast<Rowset>(_rowset); }
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 7417af83e0..6d96b8445b 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -94,7 +94,6 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& 
rowset_writer_context)
         _rowset_meta->set_load_id(_context.load_id);
     } else {
         _rowset_meta->set_version(_context.version);
-        
_rowset_meta->set_oldest_write_timestamp(_context.oldest_write_timestamp);
         
_rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp);
     }
     _rowset_meta->set_tablet_uid(_context.tablet_uid);
@@ -646,10 +645,6 @@ Status BetaRowsetWriter::_wait_flying_segcompaction() {
 }
 
 RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& 
spec_rowset_meta) {
-    if (_rowset_meta->oldest_write_timestamp() == -1) {
-        _rowset_meta->set_oldest_write_timestamp(UnixSeconds());
-    }
-
     if (_rowset_meta->newest_write_timestamp() == -1) {
         _rowset_meta->set_newest_write_timestamp(UnixSeconds());
     }
@@ -701,10 +696,6 @@ RowsetSharedPtr BetaRowsetWriter::build() {
     DCHECK(_segment_writer == nullptr) << "segment must be null when build 
rowset";
     _build_rowset_meta(_rowset_meta);
 
-    if (_rowset_meta->oldest_write_timestamp() == -1) {
-        _rowset_meta->set_oldest_write_timestamp(UnixSeconds());
-    }
-
     if (_rowset_meta->newest_write_timestamp() == -1) {
         _rowset_meta->set_newest_write_timestamp(UnixSeconds());
     }
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 5fcc599bf8..196de0cfbb 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -148,7 +148,7 @@ public:
     int64_t num_segments() const { return rowset_meta()->num_segments(); }
     void to_rowset_pb(RowsetMetaPB* rs_meta) const { return 
rowset_meta()->to_rowset_pb(rs_meta); }
     RowsetMetaPB get_rowset_pb() const { return 
rowset_meta()->get_rowset_pb(); }
-    int64_t oldest_write_timestamp() const { return 
rowset_meta()->oldest_write_timestamp(); }
+    // The writing time of the newest data in rowset, to measure the freshness 
of a rowset.
     int64_t newest_write_timestamp() const { return 
rowset_meta()->newest_write_timestamp(); }
     bool is_segments_overlapping() const { return 
rowset_meta()->is_segments_overlapping(); }
     KeysType keys_type() { return _schema->keys_type(); }
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index bb122c9834..950323f152 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -354,16 +354,10 @@ public:
         }
     }
 
-    void set_oldest_write_timestamp(int64_t timestamp) {
-        _rowset_meta_pb.set_oldest_write_timestamp(timestamp);
-    }
-
     void set_newest_write_timestamp(int64_t timestamp) {
         _rowset_meta_pb.set_newest_write_timestamp(timestamp);
     }
 
-    int64_t oldest_write_timestamp() const { return 
_rowset_meta_pb.oldest_write_timestamp(); }
-
     int64_t newest_write_timestamp() const { return 
_rowset_meta_pb.newest_write_timestamp(); }
 
     void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) {
diff --git a/be/src/olap/rowset/rowset_reader.h 
b/be/src/olap/rowset/rowset_reader.h
index 4186088fe5..4d979c6e21 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -63,7 +63,6 @@ public:
 
     virtual RowsetTypePB type() const = 0;
 
-    virtual int64_t oldest_write_timestamp() = 0;
     virtual int64_t newest_write_timestamp() = 0;
     virtual Status current_block_row_locations(std::vector<RowLocation>* 
locations) {
         return Status::NotSupported("to be implemented");
diff --git a/be/src/olap/rowset/rowset_writer_context.h 
b/be/src/olap/rowset/rowset_writer_context.h
index 60f569e362..8669a072de 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -73,7 +73,6 @@ struct RowsetWriterContext {
     // (because it hard to refactor, and RowsetConvertor will be deprecated in 
future)
     DataDir* data_dir = nullptr;
 
-    int64_t oldest_write_timestamp;
     int64_t newest_write_timestamp;
     bool enable_unique_key_merge_on_write = false;
     std::set<int32_t> skip_inverted_index;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 63a9a211eb..1efe95e11f 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -460,7 +460,6 @@ Status 
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
 
     RowsetSharedPtr rowset = rowset_reader->rowset();
     SegmentsOverlapPB segments_overlap = 
rowset->rowset_meta()->segments_overlap();
-    int64_t oldest_write_timestamp = rowset->oldest_write_timestamp();
     int64_t newest_write_timestamp = rowset->newest_write_timestamp();
     _temp_delta_versions.first = _temp_delta_versions.second;
 
@@ -472,8 +471,7 @@ Status 
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
         RowsetSharedPtr rowset;
         RETURN_IF_ERROR(_internal_sorting(
                 blocks, Version(_temp_delta_versions.second, 
_temp_delta_versions.second),
-                oldest_write_timestamp, newest_write_timestamp, new_tablet, 
BETA_ROWSET,
-                segments_overlap, &rowset));
+                newest_write_timestamp, new_tablet, BETA_ROWSET, 
segments_overlap, &rowset));
         src_rowsets.push_back(rowset);
 
         for (auto& block : blocks) {
@@ -529,8 +527,8 @@ Status 
VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
 
 Status VSchemaChangeWithSorting::_internal_sorting(
         const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const 
Version& version,
-        int64_t oldest_write_timestamp, int64_t newest_write_timestamp, 
TabletSharedPtr new_tablet,
-        RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, 
RowsetSharedPtr* rowset) {
+        int64_t newest_write_timestamp, TabletSharedPtr new_tablet, 
RowsetTypePB new_rowset_type,
+        SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
     uint64_t merged_rows = 0;
     MultiBlockMerger merger(new_tablet);
 
@@ -540,7 +538,6 @@ Status VSchemaChangeWithSorting::_internal_sorting(
     context.rowset_state = VISIBLE;
     context.segments_overlap = segments_overlap;
     context.tablet_schema = new_tablet->tablet_schema();
-    context.oldest_write_timestamp = oldest_write_timestamp;
     context.newest_write_timestamp = newest_write_timestamp;
     RETURN_IF_ERROR(new_tablet->create_rowset_writer(context, &rowset_writer));
 
@@ -1051,7 +1048,6 @@ Status 
SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
         context.rowset_state = VISIBLE;
         context.segments_overlap = 
rs_reader->rowset()->rowset_meta()->segments_overlap();
         context.tablet_schema = new_tablet->tablet_schema();
-        context.oldest_write_timestamp = rs_reader->oldest_write_timestamp();
         context.newest_write_timestamp = rs_reader->newest_write_timestamp();
         context.fs = rs_reader->rowset()->rowset_meta()->fs();
         Status status = new_tablet->create_rowset_writer(context, 
&rowset_writer);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 099d1af81d..7920186a1f 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -158,10 +158,9 @@ private:
                           TabletSharedPtr new_tablet, TabletSchemaSPtr 
base_tablet_schema) override;
 
     Status _internal_sorting(const 
std::vector<std::unique_ptr<vectorized::Block>>& blocks,
-                             const Version& temp_delta_versions, int64_t 
oldest_write_timestamp,
-                             int64_t newest_write_timestamp, TabletSharedPtr 
new_tablet,
-                             RowsetTypePB new_rowset_type, SegmentsOverlapPB 
segments_overlap,
-                             RowsetSharedPtr* rowset);
+                             const Version& temp_delta_versions, int64_t 
newest_write_timestamp,
+                             TabletSharedPtr new_tablet, RowsetTypePB 
new_rowset_type,
+                             SegmentsOverlapPB segments_overlap, 
RowsetSharedPtr* rowset);
 
     Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, 
RowsetWriter* rowset_writer,
                              TabletSharedPtr new_tablet);
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index b6288530e9..d7166dd97d 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -261,7 +261,6 @@ Status SnapshotManager::_rename_rowset_id(const 
RowsetMetaPB& rs_meta_pb,
             org_rowset_meta->tablet_schema() ? 
org_rowset_meta->tablet_schema() : tablet_schema;
     context.rowset_state = org_rowset_meta->rowset_state();
     context.version = org_rowset_meta->version();
-    context.oldest_write_timestamp = org_rowset_meta->oldest_write_timestamp();
     context.newest_write_timestamp = org_rowset_meta->newest_write_timestamp();
     // keep segments_overlap same as origin rowset
     context.segments_overlap = rowset_meta->segments_overlap();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 6ccd446fae..b0a6b63b41 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -395,10 +395,9 @@ private:
 
     scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
 
-    std::unique_ptr<ThreadPool> _cooldown_thread_pool;
+    std::unique_ptr<PriorityThreadPool> _cooldown_thread_pool;
 
     std::mutex _running_cooldown_mutex;
-    std::unordered_map<DataDir*, int64_t> _running_cooldown_tasks_cnt;
     std::unordered_set<int64_t> _running_cooldown_tablets;
 
     DISALLOW_COPY_AND_ASSIGN(StorageEngine);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 92b203b339..32c0cc3d98 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1556,7 +1556,6 @@ Status Tablet::create_initial_rowset(const int64_t 
req_version) {
         context.rowset_state = VISIBLE;
         context.segments_overlap = OVERLAP_UNKNOWN;
         context.tablet_schema = tablet_schema();
-        context.oldest_write_timestamp = UnixSeconds();
         context.newest_write_timestamp = UnixSeconds();
         res = create_rowset_writer(context, &rs_writer);
 
@@ -1890,14 +1889,6 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, 
size_t* file_size) {
         return false;
     }
 
-    int64_t oldest_cooldown_time = std::numeric_limits<int64_t>::max();
-    if (cooldown_ttl_sec >= 0) {
-        oldest_cooldown_time = rowset->oldest_write_timestamp() + 
cooldown_ttl_sec;
-    }
-    if (cooldown_datetime > 0) {
-        oldest_cooldown_time = std::min(oldest_cooldown_time, 
cooldown_datetime);
-    }
-
     int64_t newest_cooldown_time = std::numeric_limits<int64_t>::max();
     if (cooldown_ttl_sec >= 0) {
         newest_cooldown_time = rowset->newest_write_timestamp() + 
cooldown_ttl_sec;
@@ -1906,14 +1897,10 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, 
size_t* file_size) {
         newest_cooldown_time = std::min(newest_cooldown_time, 
cooldown_datetime);
     }
 
-    if (oldest_cooldown_time + config::cooldown_lag_time_sec < UnixSeconds()) {
-        *cooldown_timestamp = oldest_cooldown_time;
-        VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id()
-                   << " cooldown_timestamp: " << *cooldown_timestamp;
-        return true;
-    }
-
+    // the rowset should do cooldown job only if it's cooldown ttl plus newest 
write time is less than
+    // current time or it's datatime is less than current time
     if (newest_cooldown_time < UnixSeconds()) {
+        *cooldown_timestamp = newest_cooldown_time;
         *file_size = rowset->data_disk_size();
         VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id()
                    << " file_size: " << *file_size;
@@ -1922,7 +1909,6 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, 
size_t* file_size) {
 
     VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id()
                << " ttl sec: " << cooldown_ttl_sec << " cooldown datetime: " 
<< cooldown_datetime
-               << " oldest write time: " << rowset->oldest_write_timestamp()
                << " newest write time: " << rowset->newest_write_timestamp();
     return false;
 }
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 87334cbfe2..407fb78520 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -1301,39 +1301,43 @@ struct SortCtx {
     SortCtx(TabletSharedPtr tablet, int64_t cooldown_timestamp, int64_t 
file_size)
             : tablet(tablet), cooldown_timestamp(cooldown_timestamp), 
file_size(file_size) {}
     TabletSharedPtr tablet;
-    int64_t cooldown_timestamp;
+    // to ensure the tablet with -1 would always be greater than other
+    uint64_t cooldown_timestamp;
     int64_t file_size;
+    bool operator<(const SortCtx& other) const {
+        if (this->cooldown_timestamp == other.cooldown_timestamp) {
+            return this->file_size > other.file_size;
+        }
+        return this->cooldown_timestamp < other.cooldown_timestamp;
+    }
 };
 
-void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* 
tablets) {
+void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets,
+                                         std::function<bool(const 
TabletSharedPtr&)> skip_tablet) {
     std::vector<SortCtx> sort_ctx_vec;
+    std::vector<std::weak_ptr<Tablet>> candidates;
     for (const auto& tablets_shard : _tablets_shards) {
         std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& item : tablets_shard.tablet_map) {
-            const TabletSharedPtr& tablet = item.second;
-            int64_t cooldown_timestamp = -1;
-            size_t file_size = -1;
-            if (tablet->need_cooldown(&cooldown_timestamp, &file_size)) {
-                sort_ctx_vec.emplace_back(tablet, cooldown_timestamp, 
file_size);
-            }
-        }
-    }
-
-    std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end(), [](SortCtx a, SortCtx 
b) {
-        if (a.cooldown_timestamp != -1 && b.cooldown_timestamp != -1) {
-            return a.cooldown_timestamp < b.cooldown_timestamp;
-        }
-
-        if (a.cooldown_timestamp != -1 && b.cooldown_timestamp == -1) {
-            return true;
-        }
-
-        if (a.cooldown_timestamp == -1 && b.cooldown_timestamp != -1) {
-            return false;
-        }
+        std::for_each(
+                tablets_shard.tablet_map.begin(), 
tablets_shard.tablet_map.end(),
+                [&candidates](auto& tablet_pair) { 
candidates.emplace_back(tablet_pair.second); });
+    }
+    std::for_each(
+            candidates.begin(), candidates.end(),
+            [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) {
+                const TabletSharedPtr& tablet = t.lock();
+                if (UNLIKELY(nullptr == tablet)) {
+                    return;
+                }
+                std::shared_lock rdlock(tablet->get_header_lock());
+                int64_t cooldown_timestamp = -1;
+                size_t file_size = -1;
+                if (skip_tablet(tablet) && 
tablet->need_cooldown(&cooldown_timestamp, &file_size)) {
+                    sort_ctx_vec.emplace_back(tablet, cooldown_timestamp, 
file_size);
+                }
+            });
 
-        return a.file_size > b.file_size;
-    });
+    std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end());
 
     for (SortCtx& ctx : sort_ctx_vec) {
         VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id();
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 667013d268..7ae4060142 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -139,7 +139,8 @@ public:
     void get_tablets_distribution_on_different_disks(
             std::map<int64_t, std::map<DataDir*, int64_t>>& 
tablets_num_on_disk,
             std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& 
tablets_info_on_disk);
-    void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables);
+    void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables,
+                              std::function<bool(const TabletSharedPtr&)> 
skip_tablet);
 
     void get_all_tablets_storage_format(TCheckStorageFormatResult* result);
 
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index ff90604017..321825c891 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -73,12 +73,6 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int 
max_queue_size) {
     return *this;
 }
 
-Status ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const {
-    pool->reset(new ThreadPool(*this));
-    RETURN_IF_ERROR((*pool)->init());
-    return Status::OK();
-}
-
 ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode 
mode,
                                  int max_concurrency)
         : _mode(mode),
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index bf07e16846..89052274b0 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -27,11 +27,13 @@
 #include <functional>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <unordered_set>
 #include <utility>
 
 #include "common/status.h"
 #include "gutil/ref_counted.h"
+#include "util/priority_thread_pool.hpp"
 
 namespace doris {
 
@@ -105,7 +107,18 @@ public:
         return *this;
     }
     // Instantiate a new ThreadPool with the existing builder arguments.
-    Status build(std::unique_ptr<ThreadPool>* pool) const;
+    template <typename ThreadPoolType>
+    Status build(std::unique_ptr<ThreadPoolType>* pool) const {
+        if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) {
+            pool->reset(new ThreadPoolType(*this));
+            RETURN_IF_ERROR((*pool)->init());
+        } else if constexpr (std::is_same_v<ThreadPoolType, 
PriorityThreadPool>) {
+            pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, 
_name));
+        } else {
+            static_assert(always_false_v<ThreadPoolType>, "Unsupported 
ThreadPoolType");
+        }
+        return Status::OK();
+    }
 
 private:
     friend class ThreadPool;
@@ -117,6 +130,9 @@ private:
 
     ThreadPoolBuilder(const ThreadPoolBuilder&) = delete;
     void operator=(const ThreadPoolBuilder&) = delete;
+
+    template <typename T>
+    static constexpr bool always_false_v = false;
 };
 
 // Thread pool with a variable number of threads.
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index fe6435930a..9400e0e88c 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -111,10 +111,8 @@ public:
         pb1->set_tablet_schema(_tablet_meta->tablet_schema());
     }
 
-    void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end, 
int64_t earliest_ts,
-                      int64_t latest_ts) {
+    void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end, 
int64_t latest_ts) {
         pb1->init_from_json(_json_rowset_meta);
-        pb1->set_oldest_write_timestamp(earliest_ts);
         pb1->set_newest_write_timestamp(latest_ts);
         pb1->set_start_version(start);
         pb1->set_end_version(end);
@@ -282,27 +280,27 @@ TEST_F(TestTablet, pad_rowset) {
 TEST_F(TestTablet, cooldown_policy) {
     std::vector<RowsetMetaSharedPtr> rs_metas;
     RowsetMetaSharedPtr ptr1(new RowsetMeta());
-    init_rs_meta(ptr1, 0, 2, 100, 200);
+    init_rs_meta(ptr1, 0, 2, 200);
     rs_metas.push_back(ptr1);
     RowsetSharedPtr rowset1 = make_shared<BetaRowset>(nullptr, "", ptr1);
 
     RowsetMetaSharedPtr ptr2(new RowsetMeta());
-    init_rs_meta(ptr2, 3, 4, 300, 600);
+    init_rs_meta(ptr2, 3, 4, 600);
     rs_metas.push_back(ptr2);
     RowsetSharedPtr rowset2 = make_shared<BetaRowset>(nullptr, "", ptr2);
 
     RowsetMetaSharedPtr ptr3(new RowsetMeta());
-    init_rs_meta(ptr3, 5, 5, 800, 800);
+    init_rs_meta(ptr3, 5, 5, 800);
     rs_metas.push_back(ptr3);
     RowsetSharedPtr rowset3 = make_shared<BetaRowset>(nullptr, "", ptr3);
 
     RowsetMetaSharedPtr ptr4(new RowsetMeta());
-    init_rs_meta(ptr4, 6, 7, 1100, 1400);
+    init_rs_meta(ptr4, 6, 7, 1400);
     rs_metas.push_back(ptr4);
     RowsetSharedPtr rowset4 = make_shared<BetaRowset>(nullptr, "", ptr4);
 
     RowsetMetaSharedPtr ptr5(new RowsetMeta());
-    init_rs_meta(ptr5, 8, 9, 1800, 2000);
+    init_rs_meta(ptr5, 8, 9, 2000);
     rs_metas.push_back(ptr5);
     RowsetSharedPtr rowset5 = make_shared<BetaRowset>(nullptr, "", ptr5);
 
@@ -322,6 +320,7 @@ TEST_F(TestTablet, cooldown_policy) {
     _tablet->_rs_version_map[ptr5->version()] = rowset5;
 
     _tablet->set_cumulative_layer_point(20);
+    sleep(30);
 
     {
         auto storage_policy = std::make_shared<StoragePolicy>();
@@ -334,7 +333,7 @@ TEST_F(TestTablet, cooldown_policy) {
         bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
         ASSERT_TRUE(ret);
         ASSERT_EQ(cooldown_timestamp, 250);
-        ASSERT_EQ(file_size, -1);
+        ASSERT_EQ(file_size, 84699);
     }
 
     {
@@ -347,8 +346,8 @@ TEST_F(TestTablet, cooldown_policy) {
         size_t file_size = -1;
         bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
         ASSERT_TRUE(ret);
-        ASSERT_EQ(cooldown_timestamp, 3700);
-        ASSERT_EQ(file_size, -1);
+        ASSERT_EQ(cooldown_timestamp, 3800);
+        ASSERT_EQ(file_size, 84699);
     }
 
     {
@@ -374,8 +373,11 @@ TEST_F(TestTablet, cooldown_policy) {
         int64_t cooldown_timestamp = -1;
         size_t file_size = -1;
         bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size);
+        // the rowset with earliest version woule be picked up to do cooldown 
of which the timestamp
+        // is UnixSeconds() - 250
+        int64_t expect_cooldown_timestamp = UnixSeconds() - 50;
         ASSERT_TRUE(ret);
-        ASSERT_EQ(cooldown_timestamp, -1);
+        ASSERT_EQ(cooldown_timestamp, expect_cooldown_timestamp);
         ASSERT_EQ(file_size, 84699);
     }
 }
diff --git a/docs/en/docs/admin-manual/system-table/rowsets.md 
b/docs/en/docs/admin-manual/system-table/rowsets.md
index 7c1664be25..2ad7c7a5c6 100644
--- a/docs/en/docs/admin-manual/system-table/rowsets.md
+++ b/docs/en/docs/admin-manual/system-table/rowsets.md
@@ -51,7 +51,6 @@ MySQL [(none)]> desc information_schema.rowsets;
 | INDEX_DISK_SIZE        | BIGINT     | Yes  | false | NULL    |       |
 | DATA_DISK_SIZE         | BIGINT     | Yes  | false | NULL    |       |
 | CREATION_TIME          | BIGINT     | Yes  | false | NULL    |       |
-| OLDEST_WRITE_TIMESTAMP | BIGINT     | Yes  | false | NULL    |       |
 | NEWEST_WRITE_TIMESTAMP | BIGINT     | Yes  | false | NULL    |       |
 +------------------------+------------+------+-------+---------+-------+
 ```
diff --git a/docs/zh-CN/docs/admin-manual/system-table/rowsets.md 
b/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
index c7579b0234..159c6dc288 100644
--- a/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
+++ b/docs/zh-CN/docs/admin-manual/system-table/rowsets.md
@@ -55,7 +55,6 @@ MySQL [(none)]> desc information_schema.rowsets;
 | INDEX_DISK_SIZE        | BIGINT     | Yes  | false | NULL    |       |
 | DATA_DISK_SIZE         | BIGINT     | Yes  | false | NULL    |       |
 | CREATION_TIME          | BIGINT     | Yes  | false | NULL    |       |
-| OLDEST_WRITE_TIMESTAMP | BIGINT     | Yes  | false | NULL    |       |
 | NEWEST_WRITE_TIMESTAMP | BIGINT     | Yes  | false | NULL    |       |
 +------------------------+------------+------+-------+---------+-------+
 ```
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 9769006569..4f6ddb032e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -384,7 +384,6 @@ public class SchemaTable extends Table {
                                     .column("INDEX_DISK_SIZE", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("DATA_DISK_SIZE", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("CREATION_TIME", 
ScalarType.createType(PrimitiveType.BIGINT))
-                                    .column("OLDEST_WRITE_TIMESTAMP", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .column("NEWEST_WRITE_TIMESTAMP", 
ScalarType.createType(PrimitiveType.BIGINT))
                                     .build()))
             .put("backends", new SchemaTable(SystemIdGenerator.getNextId(), 
"backends", TableType.SCHEMA,
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 4452373335..4e65af4f06 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -100,8 +100,8 @@ message RowsetMetaPB {
     optional string rowset_id_v2 = 23;
     // resource id
     optional string resource_id = 24;
-    // earliest write time
-    optional int64 oldest_write_timestamp = 25 [default = -1];
+    // used to be oldest write time: earliest write time
+    reserved 25;
     // latest write time
     optional int64 newest_write_timestamp = 26 [default = -1];
     // the encoded segment min/max key of segments in this rowset,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to