This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 1f631c388d [enhance](cooldown)accelerate cooldown task produce efficiency (#16089) 1f631c388d is described below commit 1f631c388ddc6ecbddbf10de50f61d52d69c2932 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Fri Feb 10 16:58:27 2023 +0800 [enhance](cooldown)accelerate cooldown task produce efficiency (#16089) --- .../exec/schema_scanner/schema_rowsets_scanner.cpp | 9 --- be/src/http/action/pad_rowset_action.cpp | 1 - be/src/olap/compaction.cpp | 2 - be/src/olap/compaction.h | 1 - be/src/olap/delta_writer.cpp | 1 - be/src/olap/olap_server.cpp | 64 +++++++++------------- be/src/olap/push_handler.cpp | 1 - be/src/olap/rowset/beta_rowset_reader.h | 1 - be/src/olap/rowset/beta_rowset_writer.cpp | 9 --- be/src/olap/rowset/rowset.h | 2 +- be/src/olap/rowset/rowset_meta.h | 6 -- be/src/olap/rowset/rowset_reader.h | 1 - be/src/olap/rowset/rowset_writer_context.h | 1 - be/src/olap/schema_change.cpp | 10 +--- be/src/olap/schema_change.h | 7 +-- be/src/olap/snapshot_manager.cpp | 1 - be/src/olap/storage_engine.h | 3 +- be/src/olap/tablet.cpp | 20 +------ be/src/olap/tablet_manager.cpp | 56 ++++++++++--------- be/src/olap/tablet_manager.h | 3 +- be/src/util/threadpool.cpp | 6 -- be/src/util/threadpool.h | 18 +++++- be/test/olap/tablet_test.cpp | 26 +++++---- docs/en/docs/admin-manual/system-table/rowsets.md | 1 - .../docs/admin-manual/system-table/rowsets.md | 1 - .../java/org/apache/doris/catalog/SchemaTable.java | 1 - gensrc/proto/olap_file.proto | 4 +- 27 files changed, 101 insertions(+), 155 deletions(-) diff --git a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp index 2744531bd7..5ee608acbb 100644 --- a/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp +++ b/be/src/exec/schema_scanner/schema_rowsets_scanner.cpp @@ -44,7 +44,6 @@ std::vector<SchemaScanner::ColumnDesc> SchemaRowsetsScanner::_s_tbls_columns = { {"INDEX_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true}, {"DATA_DISK_SIZE", TYPE_BIGINT, sizeof(size_t), true}, {"CREATION_TIME", TYPE_BIGINT, sizeof(int64_t), true}, - {"OLDEST_WRITE_TIMESTAMP", TYPE_BIGINT, sizeof(int64_t), true}, {"NEWEST_WRITE_TIMESTAMP", TYPE_BIGINT, sizeof(int64_t), true}, }; @@ -189,14 +188,6 @@ Status SchemaRowsetsScanner::_fill_block_impl(vectorized::Block* block) { fill_dest_column(block, &src, _s_tbls_columns[10]); } } - // OLDEST_WRITE_TIMESTAMP - { - for (int i = fill_idx_begin; i < fill_idx_end; ++i) { - RowsetSharedPtr rowset = rowsets_[i]; - size_t src = rowset->oldest_write_timestamp(); - fill_dest_column(block, &src, _s_tbls_columns[11]); - } - } // NEWEST_WRITE_TIMESTAMP { for (int i = fill_idx_begin; i < fill_idx_end; ++i) { diff --git a/be/src/http/action/pad_rowset_action.cpp b/be/src/http/action/pad_rowset_action.cpp index 225fec4dd2..c353b4fad7 100644 --- a/be/src/http/action/pad_rowset_action.cpp +++ b/be/src/http/action/pad_rowset_action.cpp @@ -91,7 +91,6 @@ Status PadRowsetAction::_pad_rowset(TabletSharedPtr tablet, const Version& versi ctx.rowset_state = VISIBLE; ctx.segments_overlap = NONOVERLAPPING; ctx.tablet_schema = tablet->tablet_schema(); - ctx.oldest_write_timestamp = UnixSeconds(); ctx.newest_write_timestamp = UnixSeconds(); RETURN_IF_ERROR(tablet->create_rowset_writer(ctx, &writer)); auto rowset = writer->build(); diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index e15d263ded..f968f219fe 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -184,7 +184,6 @@ void Compaction::build_basic_info() { _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); - _oldest_write_timestamp = _input_rowsets.front()->oldest_write_timestamp(); _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp(); std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size()); @@ -364,7 +363,6 @@ Status Compaction::construct_output_rowset_writer(bool is_vertical) { ctx.rowset_state = VISIBLE; ctx.segments_overlap = NONOVERLAPPING; ctx.tablet_schema = _cur_tablet_schema; - ctx.oldest_write_timestamp = _oldest_write_timestamp; ctx.newest_write_timestamp = _newest_write_timestamp; if (is_vertical) { return _tablet->create_vertical_rowset_writer(ctx, &_output_rs_writer); diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 11ea2e1ec1..8501df9ef4 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -105,7 +105,6 @@ protected: Version _output_version; - int64_t _oldest_write_timestamp; int64_t _newest_write_timestamp; RowIdConversion _rowid_conversion; TabletSchemaSPtr _cur_tablet_schema; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index ee0b89570a..678d5006a0 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -137,7 +137,6 @@ Status DeltaWriter::init() { context.rowset_state = PREPARED; context.segments_overlap = OVERLAPPING; context.tablet_schema = _tablet_schema; - context.oldest_write_timestamp = UnixSeconds(); context.newest_write_timestamp = UnixSeconds(); context.tablet_id = _tablet->table_id(); context.is_direct_write = true; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 9764680ff9..2bfffbbf29 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -692,60 +692,46 @@ Status StorageEngine::submit_seg_compaction_task(BetaRowsetWriter* writer, void StorageEngine::_cooldown_tasks_producer_callback() { int64_t interval = config::generate_cooldown_task_interval_sec; do { - if (_cooldown_thread_pool->get_queue_size() > 0) { - continue; - } + // these tables are ordered by priority desc std::vector<TabletSharedPtr> tablets; // TODO(luwei) : a more efficient way to get cooldown tablets - _tablet_manager->get_cooldown_tablets(&tablets); + // we should skip all the tablets which are not running and those pending to do cooldown + auto skip_tablet = [this](const TabletSharedPtr& tablet) -> bool { + std::lock_guard<std::mutex> lock(_running_cooldown_mutex); + return TABLET_RUNNING != tablet->tablet_state() || + _running_cooldown_tablets.find(tablet->tablet_id()) == + _running_cooldown_tablets.end(); + }; + _tablet_manager->get_cooldown_tablets(&tablets, std::move(skip_tablet)); LOG(INFO) << "cooldown producer get tablet num: " << tablets.size(); + int max_priority = tablets.size(); for (const auto& tablet : tablets) { - Status st = _cooldown_thread_pool->submit_func([tablet, tablets, this]() { + { + std::lock_guard<std::mutex> lock(_running_cooldown_mutex); + _running_cooldown_tablets.insert(tablet->tablet_id()); + } + PriorityThreadPool::Task task; + task.work_function = [tablet, task_size = tablets.size(), this]() { + Status st = tablet->cooldown(); { - // Cooldown tasks on the same tablet cannot be executed concurrently std::lock_guard<std::mutex> lock(_running_cooldown_mutex); - auto it = _running_cooldown_tablets.find(tablet->tablet_id()); - if (it != _running_cooldown_tablets.end()) { - return; - } - - // the number of concurrent cooldown tasks in each directory - // cannot exceed the configured value - auto dir_it = _running_cooldown_tasks_cnt.find(tablet->data_dir()); - if (dir_it != _running_cooldown_tasks_cnt.end() && - dir_it->second >= config::concurrency_per_dir) { - return; - } - - _running_cooldown_tablets.insert(tablet->tablet_id()); - dir_it = _running_cooldown_tasks_cnt.find(tablet->data_dir()); - if (dir_it != _running_cooldown_tasks_cnt.end()) { - _running_cooldown_tasks_cnt[tablet->data_dir()]++; - } else { - _running_cooldown_tasks_cnt[tablet->data_dir()] = 1; - } + _running_cooldown_tablets.erase(tablet->tablet_id()); } - - Status st = tablet->cooldown(); if (!st.ok()) { LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id() << " err: " << st; } else { LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id() << " cooldown progress (" - << tablets.size() - _cooldown_thread_pool->get_queue_size() << "/" - << tablets.size() << ")"; + << task_size - _cooldown_thread_pool->get_queue_size() << "/" + << task_size << ")"; } + }; + task.priority = max_priority--; + bool submited = _cooldown_thread_pool->offer(std::move(task)); - { - std::lock_guard<std::mutex> lock(_running_cooldown_mutex); - _running_cooldown_tasks_cnt[tablet->data_dir()]--; - _running_cooldown_tablets.erase(tablet->tablet_id()); - } - }); - - if (!st.ok()) { - LOG(INFO) << "failed to submit cooldown task, err msg: " << st; + if (submited) { + LOG(INFO) << "failed to submit cooldown task"; } } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp index 56c202a300..6ed44ea253 100644 --- a/be/src/olap/push_handler.cpp +++ b/be/src/olap/push_handler.cpp @@ -197,7 +197,6 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur context.rowset_state = PREPARED; context.segments_overlap = OVERLAP_UNKNOWN; context.tablet_schema = tablet_schema; - context.oldest_write_timestamp = UnixSeconds(); context.newest_write_timestamp = UnixSeconds(); res = cur_tablet->create_rowset_writer(context, &rowset_writer); if (!res.ok()) { diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index 53bc73c156..e5223e5241 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -45,7 +45,6 @@ public: Version version() override { return _rowset->version(); } - int64_t oldest_write_timestamp() override { return _rowset->oldest_write_timestamp(); } int64_t newest_write_timestamp() override { return _rowset->newest_write_timestamp(); } RowsetSharedPtr rowset() override { return std::dynamic_pointer_cast<Rowset>(_rowset); } diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 7417af83e0..6d96b8445b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -94,7 +94,6 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) _rowset_meta->set_load_id(_context.load_id); } else { _rowset_meta->set_version(_context.version); - _rowset_meta->set_oldest_write_timestamp(_context.oldest_write_timestamp); _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); } _rowset_meta->set_tablet_uid(_context.tablet_uid); @@ -646,10 +645,6 @@ Status BetaRowsetWriter::_wait_flying_segcompaction() { } RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { - if (_rowset_meta->oldest_write_timestamp() == -1) { - _rowset_meta->set_oldest_write_timestamp(UnixSeconds()); - } - if (_rowset_meta->newest_write_timestamp() == -1) { _rowset_meta->set_newest_write_timestamp(UnixSeconds()); } @@ -701,10 +696,6 @@ RowsetSharedPtr BetaRowsetWriter::build() { DCHECK(_segment_writer == nullptr) << "segment must be null when build rowset"; _build_rowset_meta(_rowset_meta); - if (_rowset_meta->oldest_write_timestamp() == -1) { - _rowset_meta->set_oldest_write_timestamp(UnixSeconds()); - } - if (_rowset_meta->newest_write_timestamp() == -1) { _rowset_meta->set_newest_write_timestamp(UnixSeconds()); } diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 5fcc599bf8..196de0cfbb 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -148,7 +148,7 @@ public: int64_t num_segments() const { return rowset_meta()->num_segments(); } void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); } RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); } - int64_t oldest_write_timestamp() const { return rowset_meta()->oldest_write_timestamp(); } + // The writing time of the newest data in rowset, to measure the freshness of a rowset. int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); } bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index bb122c9834..950323f152 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -354,16 +354,10 @@ public: } } - void set_oldest_write_timestamp(int64_t timestamp) { - _rowset_meta_pb.set_oldest_write_timestamp(timestamp); - } - void set_newest_write_timestamp(int64_t timestamp) { _rowset_meta_pb.set_newest_write_timestamp(timestamp); } - int64_t oldest_write_timestamp() const { return _rowset_meta_pb.oldest_write_timestamp(); } - int64_t newest_write_timestamp() const { return _rowset_meta_pb.newest_write_timestamp(); } void set_tablet_schema(const TabletSchemaSPtr& tablet_schema) { diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 4186088fe5..4d979c6e21 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -63,7 +63,6 @@ public: virtual RowsetTypePB type() const = 0; - virtual int64_t oldest_write_timestamp() = 0; virtual int64_t newest_write_timestamp() = 0; virtual Status current_block_row_locations(std::vector<RowLocation>* locations) { return Status::NotSupported("to be implemented"); diff --git a/be/src/olap/rowset/rowset_writer_context.h b/be/src/olap/rowset/rowset_writer_context.h index 60f569e362..8669a072de 100644 --- a/be/src/olap/rowset/rowset_writer_context.h +++ b/be/src/olap/rowset/rowset_writer_context.h @@ -73,7 +73,6 @@ struct RowsetWriterContext { // (because it hard to refactor, and RowsetConvertor will be deprecated in future) DataDir* data_dir = nullptr; - int64_t oldest_write_timestamp; int64_t newest_write_timestamp; bool enable_unique_key_merge_on_write = false; std::set<int32_t> skip_inverted_index; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 63a9a211eb..1efe95e11f 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -460,7 +460,6 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea RowsetSharedPtr rowset = rowset_reader->rowset(); SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap(); - int64_t oldest_write_timestamp = rowset->oldest_write_timestamp(); int64_t newest_write_timestamp = rowset->newest_write_timestamp(); _temp_delta_versions.first = _temp_delta_versions.second; @@ -472,8 +471,7 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea RowsetSharedPtr rowset; RETURN_IF_ERROR(_internal_sorting( blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second), - oldest_write_timestamp, newest_write_timestamp, new_tablet, BETA_ROWSET, - segments_overlap, &rowset)); + newest_write_timestamp, new_tablet, BETA_ROWSET, segments_overlap, &rowset)); src_rowsets.push_back(rowset); for (auto& block : blocks) { @@ -529,8 +527,8 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea Status VSchemaChangeWithSorting::_internal_sorting( const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version, - int64_t oldest_write_timestamp, int64_t newest_write_timestamp, TabletSharedPtr new_tablet, - RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) { + int64_t newest_write_timestamp, TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) { uint64_t merged_rows = 0; MultiBlockMerger merger(new_tablet); @@ -540,7 +538,6 @@ Status VSchemaChangeWithSorting::_internal_sorting( context.rowset_state = VISIBLE; context.segments_overlap = segments_overlap; context.tablet_schema = new_tablet->tablet_schema(); - context.oldest_write_timestamp = oldest_write_timestamp; context.newest_write_timestamp = newest_write_timestamp; RETURN_IF_ERROR(new_tablet->create_rowset_writer(context, &rowset_writer)); @@ -1051,7 +1048,6 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams context.rowset_state = VISIBLE; context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); context.tablet_schema = new_tablet->tablet_schema(); - context.oldest_write_timestamp = rs_reader->oldest_write_timestamp(); context.newest_write_timestamp = rs_reader->newest_write_timestamp(); context.fs = rs_reader->rowset()->rowset_meta()->fs(); Status status = new_tablet->create_rowset_writer(context, &rowset_writer); diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 099d1af81d..7920186a1f 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -158,10 +158,9 @@ private: TabletSharedPtr new_tablet, TabletSchemaSPtr base_tablet_schema) override; Status _internal_sorting(const std::vector<std::unique_ptr<vectorized::Block>>& blocks, - const Version& temp_delta_versions, int64_t oldest_write_timestamp, - int64_t newest_write_timestamp, TabletSharedPtr new_tablet, - RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap, - RowsetSharedPtr* rowset); + const Version& temp_delta_versions, int64_t newest_write_timestamp, + TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type, + SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset); Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer, TabletSharedPtr new_tablet); diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp index b6288530e9..d7166dd97d 100644 --- a/be/src/olap/snapshot_manager.cpp +++ b/be/src/olap/snapshot_manager.cpp @@ -261,7 +261,6 @@ Status SnapshotManager::_rename_rowset_id(const RowsetMetaPB& rs_meta_pb, org_rowset_meta->tablet_schema() ? org_rowset_meta->tablet_schema() : tablet_schema; context.rowset_state = org_rowset_meta->rowset_state(); context.version = org_rowset_meta->version(); - context.oldest_write_timestamp = org_rowset_meta->oldest_write_timestamp(); context.newest_write_timestamp = org_rowset_meta->newest_write_timestamp(); // keep segments_overlap same as origin rowset context.segments_overlap = rowset_meta->segments_overlap(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 6ccd446fae..b0a6b63b41 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -395,10 +395,9 @@ private: scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread; - std::unique_ptr<ThreadPool> _cooldown_thread_pool; + std::unique_ptr<PriorityThreadPool> _cooldown_thread_pool; std::mutex _running_cooldown_mutex; - std::unordered_map<DataDir*, int64_t> _running_cooldown_tasks_cnt; std::unordered_set<int64_t> _running_cooldown_tablets; DISALLOW_COPY_AND_ASSIGN(StorageEngine); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 92b203b339..32c0cc3d98 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -1556,7 +1556,6 @@ Status Tablet::create_initial_rowset(const int64_t req_version) { context.rowset_state = VISIBLE; context.segments_overlap = OVERLAP_UNKNOWN; context.tablet_schema = tablet_schema(); - context.oldest_write_timestamp = UnixSeconds(); context.newest_write_timestamp = UnixSeconds(); res = create_rowset_writer(context, &rs_writer); @@ -1890,14 +1889,6 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) { return false; } - int64_t oldest_cooldown_time = std::numeric_limits<int64_t>::max(); - if (cooldown_ttl_sec >= 0) { - oldest_cooldown_time = rowset->oldest_write_timestamp() + cooldown_ttl_sec; - } - if (cooldown_datetime > 0) { - oldest_cooldown_time = std::min(oldest_cooldown_time, cooldown_datetime); - } - int64_t newest_cooldown_time = std::numeric_limits<int64_t>::max(); if (cooldown_ttl_sec >= 0) { newest_cooldown_time = rowset->newest_write_timestamp() + cooldown_ttl_sec; @@ -1906,14 +1897,10 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) { newest_cooldown_time = std::min(newest_cooldown_time, cooldown_datetime); } - if (oldest_cooldown_time + config::cooldown_lag_time_sec < UnixSeconds()) { - *cooldown_timestamp = oldest_cooldown_time; - VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id() - << " cooldown_timestamp: " << *cooldown_timestamp; - return true; - } - + // the rowset should do cooldown job only if it's cooldown ttl plus newest write time is less than + // current time or it's datatime is less than current time if (newest_cooldown_time < UnixSeconds()) { + *cooldown_timestamp = newest_cooldown_time; *file_size = rowset->data_disk_size(); VLOG_DEBUG << "tablet need cooldown, tablet id: " << tablet_id() << " file_size: " << *file_size; @@ -1922,7 +1909,6 @@ bool Tablet::need_cooldown(int64_t* cooldown_timestamp, size_t* file_size) { VLOG_DEBUG << "tablet does not need cooldown, tablet id: " << tablet_id() << " ttl sec: " << cooldown_ttl_sec << " cooldown datetime: " << cooldown_datetime - << " oldest write time: " << rowset->oldest_write_timestamp() << " newest write time: " << rowset->newest_write_timestamp(); return false; } diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 87334cbfe2..407fb78520 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -1301,39 +1301,43 @@ struct SortCtx { SortCtx(TabletSharedPtr tablet, int64_t cooldown_timestamp, int64_t file_size) : tablet(tablet), cooldown_timestamp(cooldown_timestamp), file_size(file_size) {} TabletSharedPtr tablet; - int64_t cooldown_timestamp; + // to ensure the tablet with -1 would always be greater than other + uint64_t cooldown_timestamp; int64_t file_size; + bool operator<(const SortCtx& other) const { + if (this->cooldown_timestamp == other.cooldown_timestamp) { + return this->file_size > other.file_size; + } + return this->cooldown_timestamp < other.cooldown_timestamp; + } }; -void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets) { +void TabletManager::get_cooldown_tablets(std::vector<TabletSharedPtr>* tablets, + std::function<bool(const TabletSharedPtr&)> skip_tablet) { std::vector<SortCtx> sort_ctx_vec; + std::vector<std::weak_ptr<Tablet>> candidates; for (const auto& tablets_shard : _tablets_shards) { std::shared_lock rdlock(tablets_shard.lock); - for (const auto& item : tablets_shard.tablet_map) { - const TabletSharedPtr& tablet = item.second; - int64_t cooldown_timestamp = -1; - size_t file_size = -1; - if (tablet->need_cooldown(&cooldown_timestamp, &file_size)) { - sort_ctx_vec.emplace_back(tablet, cooldown_timestamp, file_size); - } - } - } - - std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end(), [](SortCtx a, SortCtx b) { - if (a.cooldown_timestamp != -1 && b.cooldown_timestamp != -1) { - return a.cooldown_timestamp < b.cooldown_timestamp; - } - - if (a.cooldown_timestamp != -1 && b.cooldown_timestamp == -1) { - return true; - } - - if (a.cooldown_timestamp == -1 && b.cooldown_timestamp != -1) { - return false; - } + std::for_each( + tablets_shard.tablet_map.begin(), tablets_shard.tablet_map.end(), + [&candidates](auto& tablet_pair) { candidates.emplace_back(tablet_pair.second); }); + } + std::for_each( + candidates.begin(), candidates.end(), + [&sort_ctx_vec, &skip_tablet](std::weak_ptr<Tablet>& t) { + const TabletSharedPtr& tablet = t.lock(); + if (UNLIKELY(nullptr == tablet)) { + return; + } + std::shared_lock rdlock(tablet->get_header_lock()); + int64_t cooldown_timestamp = -1; + size_t file_size = -1; + if (skip_tablet(tablet) && tablet->need_cooldown(&cooldown_timestamp, &file_size)) { + sort_ctx_vec.emplace_back(tablet, cooldown_timestamp, file_size); + } + }); - return a.file_size > b.file_size; - }); + std::sort(sort_ctx_vec.begin(), sort_ctx_vec.end()); for (SortCtx& ctx : sort_ctx_vec) { VLOG_DEBUG << "get cooldown tablet: " << ctx.tablet->tablet_id(); diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 667013d268..7ae4060142 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -139,7 +139,8 @@ public: void get_tablets_distribution_on_different_disks( std::map<int64_t, std::map<DataDir*, int64_t>>& tablets_num_on_disk, std::map<int64_t, std::map<DataDir*, std::vector<TabletSize>>>& tablets_info_on_disk); - void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables); + void get_cooldown_tablets(std::vector<TabletSharedPtr>* tables, + std::function<bool(const TabletSharedPtr&)> skip_tablet); void get_all_tablets_storage_format(TCheckStorageFormatResult* result); diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp index ff90604017..321825c891 100644 --- a/be/src/util/threadpool.cpp +++ b/be/src/util/threadpool.cpp @@ -73,12 +73,6 @@ ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { return *this; } -Status ThreadPoolBuilder::build(std::unique_ptr<ThreadPool>* pool) const { - pool->reset(new ThreadPool(*this)); - RETURN_IF_ERROR((*pool)->init()); - return Status::OK(); -} - ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode, int max_concurrency) : _mode(mode), diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h index bf07e16846..89052274b0 100644 --- a/be/src/util/threadpool.h +++ b/be/src/util/threadpool.h @@ -27,11 +27,13 @@ #include <functional> #include <memory> #include <string> +#include <type_traits> #include <unordered_set> #include <utility> #include "common/status.h" #include "gutil/ref_counted.h" +#include "util/priority_thread_pool.hpp" namespace doris { @@ -105,7 +107,18 @@ public: return *this; } // Instantiate a new ThreadPool with the existing builder arguments. - Status build(std::unique_ptr<ThreadPool>* pool) const; + template <typename ThreadPoolType> + Status build(std::unique_ptr<ThreadPoolType>* pool) const { + if constexpr (std::is_same_v<ThreadPoolType, ThreadPool>) { + pool->reset(new ThreadPoolType(*this)); + RETURN_IF_ERROR((*pool)->init()); + } else if constexpr (std::is_same_v<ThreadPoolType, PriorityThreadPool>) { + pool->reset(new ThreadPoolType(_max_threads, _max_queue_size, _name)); + } else { + static_assert(always_false_v<ThreadPoolType>, "Unsupported ThreadPoolType"); + } + return Status::OK(); + } private: friend class ThreadPool; @@ -117,6 +130,9 @@ private: ThreadPoolBuilder(const ThreadPoolBuilder&) = delete; void operator=(const ThreadPoolBuilder&) = delete; + + template <typename T> + static constexpr bool always_false_v = false; }; // Thread pool with a variable number of threads. diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp index fe6435930a..9400e0e88c 100644 --- a/be/test/olap/tablet_test.cpp +++ b/be/test/olap/tablet_test.cpp @@ -111,10 +111,8 @@ public: pb1->set_tablet_schema(_tablet_meta->tablet_schema()); } - void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end, int64_t earliest_ts, - int64_t latest_ts) { + void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end, int64_t latest_ts) { pb1->init_from_json(_json_rowset_meta); - pb1->set_oldest_write_timestamp(earliest_ts); pb1->set_newest_write_timestamp(latest_ts); pb1->set_start_version(start); pb1->set_end_version(end); @@ -282,27 +280,27 @@ TEST_F(TestTablet, pad_rowset) { TEST_F(TestTablet, cooldown_policy) { std::vector<RowsetMetaSharedPtr> rs_metas; RowsetMetaSharedPtr ptr1(new RowsetMeta()); - init_rs_meta(ptr1, 0, 2, 100, 200); + init_rs_meta(ptr1, 0, 2, 200); rs_metas.push_back(ptr1); RowsetSharedPtr rowset1 = make_shared<BetaRowset>(nullptr, "", ptr1); RowsetMetaSharedPtr ptr2(new RowsetMeta()); - init_rs_meta(ptr2, 3, 4, 300, 600); + init_rs_meta(ptr2, 3, 4, 600); rs_metas.push_back(ptr2); RowsetSharedPtr rowset2 = make_shared<BetaRowset>(nullptr, "", ptr2); RowsetMetaSharedPtr ptr3(new RowsetMeta()); - init_rs_meta(ptr3, 5, 5, 800, 800); + init_rs_meta(ptr3, 5, 5, 800); rs_metas.push_back(ptr3); RowsetSharedPtr rowset3 = make_shared<BetaRowset>(nullptr, "", ptr3); RowsetMetaSharedPtr ptr4(new RowsetMeta()); - init_rs_meta(ptr4, 6, 7, 1100, 1400); + init_rs_meta(ptr4, 6, 7, 1400); rs_metas.push_back(ptr4); RowsetSharedPtr rowset4 = make_shared<BetaRowset>(nullptr, "", ptr4); RowsetMetaSharedPtr ptr5(new RowsetMeta()); - init_rs_meta(ptr5, 8, 9, 1800, 2000); + init_rs_meta(ptr5, 8, 9, 2000); rs_metas.push_back(ptr5); RowsetSharedPtr rowset5 = make_shared<BetaRowset>(nullptr, "", ptr5); @@ -322,6 +320,7 @@ TEST_F(TestTablet, cooldown_policy) { _tablet->_rs_version_map[ptr5->version()] = rowset5; _tablet->set_cumulative_layer_point(20); + sleep(30); { auto storage_policy = std::make_shared<StoragePolicy>(); @@ -334,7 +333,7 @@ TEST_F(TestTablet, cooldown_policy) { bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size); ASSERT_TRUE(ret); ASSERT_EQ(cooldown_timestamp, 250); - ASSERT_EQ(file_size, -1); + ASSERT_EQ(file_size, 84699); } { @@ -347,8 +346,8 @@ TEST_F(TestTablet, cooldown_policy) { size_t file_size = -1; bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size); ASSERT_TRUE(ret); - ASSERT_EQ(cooldown_timestamp, 3700); - ASSERT_EQ(file_size, -1); + ASSERT_EQ(cooldown_timestamp, 3800); + ASSERT_EQ(file_size, 84699); } { @@ -374,8 +373,11 @@ TEST_F(TestTablet, cooldown_policy) { int64_t cooldown_timestamp = -1; size_t file_size = -1; bool ret = _tablet->need_cooldown(&cooldown_timestamp, &file_size); + // the rowset with earliest version woule be picked up to do cooldown of which the timestamp + // is UnixSeconds() - 250 + int64_t expect_cooldown_timestamp = UnixSeconds() - 50; ASSERT_TRUE(ret); - ASSERT_EQ(cooldown_timestamp, -1); + ASSERT_EQ(cooldown_timestamp, expect_cooldown_timestamp); ASSERT_EQ(file_size, 84699); } } diff --git a/docs/en/docs/admin-manual/system-table/rowsets.md b/docs/en/docs/admin-manual/system-table/rowsets.md index 7c1664be25..2ad7c7a5c6 100644 --- a/docs/en/docs/admin-manual/system-table/rowsets.md +++ b/docs/en/docs/admin-manual/system-table/rowsets.md @@ -51,7 +51,6 @@ MySQL [(none)]> desc information_schema.rowsets; | INDEX_DISK_SIZE | BIGINT | Yes | false | NULL | | | DATA_DISK_SIZE | BIGINT | Yes | false | NULL | | | CREATION_TIME | BIGINT | Yes | false | NULL | | -| OLDEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | | | NEWEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | | +------------------------+------------+------+-------+---------+-------+ ``` diff --git a/docs/zh-CN/docs/admin-manual/system-table/rowsets.md b/docs/zh-CN/docs/admin-manual/system-table/rowsets.md index c7579b0234..159c6dc288 100644 --- a/docs/zh-CN/docs/admin-manual/system-table/rowsets.md +++ b/docs/zh-CN/docs/admin-manual/system-table/rowsets.md @@ -55,7 +55,6 @@ MySQL [(none)]> desc information_schema.rowsets; | INDEX_DISK_SIZE | BIGINT | Yes | false | NULL | | | DATA_DISK_SIZE | BIGINT | Yes | false | NULL | | | CREATION_TIME | BIGINT | Yes | false | NULL | | -| OLDEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | | | NEWEST_WRITE_TIMESTAMP | BIGINT | Yes | false | NULL | | +------------------------+------------+------+-------+---------+-------+ ``` diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java index 9769006569..4f6ddb032e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java @@ -384,7 +384,6 @@ public class SchemaTable extends Table { .column("INDEX_DISK_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) .column("DATA_DISK_SIZE", ScalarType.createType(PrimitiveType.BIGINT)) .column("CREATION_TIME", ScalarType.createType(PrimitiveType.BIGINT)) - .column("OLDEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.BIGINT)) .column("NEWEST_WRITE_TIMESTAMP", ScalarType.createType(PrimitiveType.BIGINT)) .build())) .put("backends", new SchemaTable(SystemIdGenerator.getNextId(), "backends", TableType.SCHEMA, diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 4452373335..4e65af4f06 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -100,8 +100,8 @@ message RowsetMetaPB { optional string rowset_id_v2 = 23; // resource id optional string resource_id = 24; - // earliest write time - optional int64 oldest_write_timestamp = 25 [default = -1]; + // used to be oldest write time: earliest write time + reserved 25; // latest write time optional int64 newest_write_timestamp = 26 [default = -1]; // the encoded segment min/max key of segments in this rowset, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org