This is an automated email from the ASF dual-hosted git repository. jakevin pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 88170a94e8737e19696f614b1ea21ca5d0f407d3 Author: zzzxl1993 <474696...@qq.com> AuthorDate: Tue Mar 19 18:49:43 2024 +0800 (invert index) modify of time series compaction policy --- be/src/agent/task_worker_pool.cpp | 23 +++-- be/src/olap/cumulative_compaction.cpp | 8 +- be/src/olap/cumulative_compaction_policy.h | 8 ++ .../cumulative_compaction_time_series_policy.cpp | 108 +++++++++++++++++++-- .../cumulative_compaction_time_series_policy.h | 4 + be/src/olap/full_compaction.cpp | 6 +- be/src/olap/rowset/rowset_meta.h | 6 ++ be/src/olap/tablet_meta.cpp | 17 +++- be/src/olap/tablet_meta.h | 10 +- .../Create/CREATE-TABLE.md | 9 ++ .../Create/CREATE-TABLE.md | 8 ++ .../main/java/org/apache/doris/alter/Alter.java | 4 +- .../java/org/apache/doris/alter/RollupJobV2.java | 1 + .../apache/doris/alter/SchemaChangeHandler.java | 7 ++ .../org/apache/doris/alter/SchemaChangeJobV2.java | 1 + .../analysis/ModifyTablePropertiesClause.java | 17 ++++ .../java/org/apache/doris/backup/RestoreJob.java | 1 + .../main/java/org/apache/doris/catalog/Env.java | 11 ++- .../java/org/apache/doris/catalog/OlapTable.java | 14 +++ .../org/apache/doris/catalog/TableProperty.java | 18 +++- .../apache/doris/common/util/PropertyAnalyzer.java | 29 +++++- .../apache/doris/datasource/InternalCatalog.java | 20 +++- .../org/apache/doris/master/ReportHandler.java | 1 + .../org/apache/doris/task/CreateReplicaTask.java | 5 + .../doris/task/UpdateTabletMetaInfoTask.java | 5 + .../java/org/apache/doris/task/AgentTaskTest.java | 2 +- gensrc/proto/olap_file.proto | 2 + gensrc/thrift/AgentService.thrift | 2 + 28 files changed, 320 insertions(+), 27 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 00c16cfa12d..34918a5b0a2 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -52,6 +52,7 @@ #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "io/fs/s3_file_system.h" +#include "olap/cumulative_compaction_time_series_policy.h" #include "olap/data_dir.h" #include "olap/olap_common.h" #include "olap/rowset/rowset_meta.h" @@ -691,8 +692,8 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& need_to_save = true; } if (tablet_meta_info.__isset.compaction_policy) { - if (tablet_meta_info.compaction_policy != "size_based" && - tablet_meta_info.compaction_policy != "time_series") { + if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY && + tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "invalid compaction policy, only support for size_based or " "time_series"); @@ -702,7 +703,7 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { + if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; @@ -712,7 +713,7 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { + if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; @@ -722,7 +723,7 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { + if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; @@ -732,7 +733,7 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& need_to_save = true; } if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) { - if (tablet->tablet_meta()->compaction_policy() != "time_series") { + if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { status = Status::InvalidArgument( "only time series compaction policy support time series config"); continue; @@ -741,6 +742,16 @@ void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& tablet_meta_info.time_series_compaction_empty_rowsets_threshold); need_to_save = true; } + if (tablet_meta_info.__isset.time_series_compaction_level_threshold) { + if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { + status = Status::InvalidArgument( + "only time series compaction policy support time series config"); + continue; + } + tablet->tablet_meta()->set_time_series_compaction_level_threshold( + tablet_meta_info.time_series_compaction_level_threshold); + need_to_save = true; + } if (tablet_meta_info.__isset.replica_id) { tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); } diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 1f54c1f3285..42748012cab 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -81,13 +81,17 @@ Status CumulativeCompaction::execute_compact_impl() { // 4. set state to success _state = CompactionState::SUCCESS; - // 5. set cumulative point + // 5. set cumulative level + _tablet->cumulative_compaction_policy()->update_compaction_level(_tablet.get(), _input_rowsets, + _output_rowset); + + // 6. set cumulative point _tablet->cumulative_compaction_policy()->update_cumulative_point( _tablet.get(), _input_rowsets, _output_rowset, _last_delete_version); VLOG_CRITICAL << "after cumulative compaction, current cumulative point is " << _tablet->cumulative_layer_point() << ", tablet=" << _tablet->tablet_id(); - // 6. add metric to cumulative compaction + // 7. add metric to cumulative compaction DorisMetrics::instance()->cumulative_compaction_deltas_total->increment(_input_rowsets.size()); DorisMetrics::instance()->cumulative_compaction_bytes_total->increment(_input_rowsets_size); diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h index b9b3bb46c0f..e2e5ca5460d 100644 --- a/be/src/olap/cumulative_compaction_policy.h +++ b/be/src/olap/cumulative_compaction_policy.h @@ -96,6 +96,11 @@ public: int64_t current_cumulative_point, int64_t* cumulative_point) = 0; + // Updates the compaction level of a tablet after a compaction operation. + virtual void update_compaction_level(Tablet* tablet, + const std::vector<RowsetSharedPtr>& input_rowsets, + RowsetSharedPtr output_rowset) = 0; + /// Fetch cumulative policy name virtual std::string_view name() = 0; }; @@ -149,6 +154,9 @@ public: /// Its main policy is calculating the accumulative compaction score after current cumulative_point in tablet. uint32_t calc_cumulative_compaction_score(Tablet* tablet) override; + void update_compaction_level(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, + RowsetSharedPtr output_rowset) override {} + std::string_view name() override { return CUMULATIVE_SIZE_BASED_POLICY; } private: diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp b/be/src/olap/cumulative_compaction_time_series_policy.cpp index 6f7cb8e2e35..6c3f949723a 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.cpp +++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp @@ -17,19 +17,24 @@ #include "olap/cumulative_compaction_time_series_policy.h" +#include <algorithm> + #include "common/logging.h" #include "olap/tablet.h" +#include "olap/tablet_meta.h" #include "util/time.h" namespace doris { uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) { uint32_t score = 0; + uint32_t level0_score = 0; bool base_rowset_exist = false; const int64_t point = tablet->cumulative_layer_point(); - int64_t total_size = 0; + int64_t level0_total_size = 0; RowsetMetaSharedPtr first_meta; int64_t first_version = INT64_MAX; + std::list<RowsetMetaSharedPtr> checked_rs_metas; // NOTE: tablet._meta_lock is hold auto& rs_metas = tablet->tablet_meta()->all_rs_metas(); // check the base rowset and collect the rowsets of cumulative part @@ -47,8 +52,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( continue; } else { // collect the rowsets of cumulative part - total_size += rs_meta->total_disk_size(); score += rs_meta->get_compaction_score(); + if (rs_meta->compaction_level() == 0) { + level0_total_size += rs_meta->total_disk_size(); + level0_score += rs_meta->get_compaction_score(); + } else { + checked_rs_metas.push_back(rs_meta); + } } } @@ -73,21 +83,39 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score( // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size int64_t compaction_goal_size_mbytes = tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); - if (total_size >= compaction_goal_size_mbytes * 1024 * 1024) { + if (level0_total_size >= compaction_goal_size_mbytes * 1024 * 1024) { return score; } // Condition 2: the number of input files reaches the threshold specified by parameter compaction_file_count_threshold - if (score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { + if (level0_score >= tablet->tablet_meta()->time_series_compaction_file_count_threshold()) { return score; } + // Condition 3: level1 achieve compaction_goal_size + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + checked_rs_metas.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { + return a->version().first < b->version().first; + }); + int32_t rs_meta_count = 0; + int64_t continuous_size = 0; + for (const auto& rs_meta : checked_rs_metas) { + rs_meta_count++; + continuous_size += rs_meta->total_disk_size(); + if (rs_meta_count >= 2) { + if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) { + return score; + } + } + } + } + int64_t now = UnixMillis(); int64_t last_cumu = tablet->last_cumu_compaction_success_time(); if (last_cumu != 0) { int64_t cumu_interval = now - last_cumu; - // Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second + // Condition 4: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second if (cumu_interval > (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { return score; @@ -163,6 +191,13 @@ void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point( break; } + // upgrade: [0 0 2 1 1 0 0] + if (!is_delete && tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 && + rs->compaction_level() == 1) { + *ret_cumulative_point = rs->version().first; + break; + } + // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase prev_version = rs->version().second; *ret_cumulative_point = prev_version + 1; @@ -193,6 +228,9 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( return 0; } + int64_t compaction_goal_size_mbytes = + tablet->tablet_meta()->time_series_compaction_goal_size_mbytes(); + int transient_size = 0; *compaction_score = 0; input_rowsets->clear(); @@ -231,8 +269,7 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( input_rowsets->push_back(rowset); // Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size - if (total_size >= - (tablet->tablet_meta()->time_series_compaction_goal_size_mbytes() * 1024 * 1024)) { + if (total_size >= (compaction_goal_size_mbytes * 1024 * 1024)) { if (input_rowsets->size() == 1 && !input_rowsets->front()->rowset_meta()->is_segments_overlapping()) { // Only 1 non-overlapping rowset, skip it @@ -262,20 +299,47 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( return transient_size; } + // Condition 3: level1 achieve compaction_goal_size + std::vector<RowsetSharedPtr> level1_rowsets; + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + int64_t continuous_size = 0; + for (const auto& rowset : candidate_rowsets) { + const auto& rs_meta = rowset->rowset_meta(); + if (rs_meta->compaction_level() == 0) { + break; + } + level1_rowsets.push_back(rowset); + continuous_size += rs_meta->total_disk_size(); + if (level1_rowsets.size() >= 2) { + if (continuous_size >= compaction_goal_size_mbytes * 1024 * 1024) { + input_rowsets->swap(level1_rowsets); + return input_rowsets->size(); + } + } + } + } + int64_t now = UnixMillis(); int64_t last_cumu = tablet->last_cumu_compaction_success_time(); if (last_cumu != 0) { int64_t cumu_interval = now - last_cumu; - // Condition 3: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second + // Condition 4: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second if (cumu_interval > (tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() * 1000)) { + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) { + if (input_rowsets->empty() && level1_rowsets.size() >= 2) { + input_rowsets->swap(level1_rowsets); + return input_rowsets->size(); + } + } return transient_size; } } input_rowsets->clear(); *compaction_score = 0; + return 0; } @@ -288,7 +352,35 @@ void TimeSeriesCumulativeCompactionPolicy::update_cumulative_point( return; } + if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 && + output_rowset->rowset_meta()->compaction_level() < 2) { + return; + } + tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); } +void TimeSeriesCumulativeCompactionPolicy::update_compaction_level( + Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, + RowsetSharedPtr output_rowset) { + if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) { + return; + } + + int64_t first_level = 0; + for (size_t i = 0; i < input_rowsets.size(); i++) { + int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level(); + if (i == 0) { + first_level = cur_level; + } else { + if (first_level != cur_level) { + LOG(ERROR) << "Failed to check compaction level, first_level: " << first_level + << ", cur_level: " << cur_level; + } + } + } + + output_rowset->rowset_meta()->set_compaction_level(first_level + 1); +} + } // namespace doris \ No newline at end of file diff --git a/be/src/olap/cumulative_compaction_time_series_policy.h b/be/src/olap/cumulative_compaction_time_series_policy.h index 015dce055e9..4c134202e1d 100644 --- a/be/src/olap/cumulative_compaction_time_series_policy.h +++ b/be/src/olap/cumulative_compaction_time_series_policy.h @@ -59,6 +59,10 @@ public: void update_cumulative_point(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, RowsetSharedPtr _output_rowset, Version& last_delete_version) override; + + void update_compaction_level(Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, + RowsetSharedPtr output_rowset) override; + std::string_view name() override { return CUMULATIVE_TIME_SERIES_POLICY; } }; diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 927b4a33198..ba336c7cbe9 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -73,7 +73,11 @@ Status FullCompaction::execute_compact_impl() { // 3. set state to success _state = CompactionState::SUCCESS; - // 4. set cumulative point + // 4. set cumulative level + _tablet->cumulative_compaction_policy()->update_compaction_level(_tablet.get(), _input_rowsets, + _output_rowset); + + // 5. set cumulative point Version last_version = _input_rowsets.back()->version(); _tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(), _input_rowsets, _output_rowset, last_version); diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 7e1dfaa57c3..30457d30bc6 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -304,6 +304,12 @@ public: const TabletSchemaSPtr& tablet_schema() { return _schema; } + void set_compaction_level(int64_t compaction_level) { + _rowset_meta_pb.set_compaction_level(compaction_level); + } + + int64_t compaction_level() { return _rowset_meta_pb.compaction_level(); } + // Because the member field '_handle' is a raw pointer, use member func 'init' to replace copy ctor RowsetMeta(const RowsetMeta&) = delete; RowsetMeta operator=(const RowsetMeta&) = delete; diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 07cc07bc873..23f4428f747 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -71,7 +71,8 @@ TabletMetaSharedPtr TabletMeta::create( request.time_series_compaction_file_count_threshold, request.time_series_compaction_time_threshold_seconds, request.time_series_compaction_empty_rowsets_threshold, - request.inverted_index_storage_format); + request.inverted_index_storage_format, + request.time_series_compaction_level_threshold); } TabletMeta::TabletMeta() @@ -91,7 +92,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id int64_t time_series_compaction_file_count_threshold, int64_t time_series_compaction_time_threshold_seconds, int64_t time_series_compaction_empty_rowsets_threshold, - TInvertedIndexStorageFormat::type inverted_index_storage_format) + TInvertedIndexStorageFormat::type inverted_index_storage_format, + int64_t time_series_compaction_level_threshold) : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap(tablet_id)) { @@ -121,6 +123,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id time_series_compaction_time_threshold_seconds); tablet_meta_pb.set_time_series_compaction_empty_rowsets_threshold( time_series_compaction_empty_rowsets_threshold); + tablet_meta_pb.set_time_series_compaction_level_threshold( + time_series_compaction_level_threshold); TabletSchemaPB* schema = tablet_meta_pb.mutable_schema(); schema->set_num_short_key_columns(tablet_schema.short_key_column_count); schema->set_num_rows_per_row_block(config::default_num_rows_per_column_file_block); @@ -334,7 +338,8 @@ TabletMeta::TabletMeta(const TabletMeta& b) _time_series_compaction_time_threshold_seconds( b._time_series_compaction_time_threshold_seconds), _time_series_compaction_empty_rowsets_threshold( - b._time_series_compaction_empty_rowsets_threshold) {}; + b._time_series_compaction_empty_rowsets_threshold), + _time_series_compaction_level_threshold(b._time_series_compaction_level_threshold) {}; void TabletMeta::init_column_from_tcolumn(uint32_t unique_id, const TColumn& tcolumn, ColumnPB* column) { @@ -635,6 +640,8 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) { tablet_meta_pb.time_series_compaction_time_threshold_seconds(); _time_series_compaction_empty_rowsets_threshold = tablet_meta_pb.time_series_compaction_empty_rowsets_threshold(); + _time_series_compaction_level_threshold = + tablet_meta_pb.time_series_compaction_level_threshold(); } void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { @@ -718,6 +725,8 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) { time_series_compaction_time_threshold_seconds()); tablet_meta_pb->set_time_series_compaction_empty_rowsets_threshold( time_series_compaction_empty_rowsets_threshold()); + tablet_meta_pb->set_time_series_compaction_level_threshold( + time_series_compaction_level_threshold()); } int64_t TabletMeta::mem_size() const { @@ -906,6 +915,8 @@ bool operator==(const TabletMeta& a, const TabletMeta& b) { if (a._time_series_compaction_empty_rowsets_threshold != b._time_series_compaction_empty_rowsets_threshold) return false; + if (a._time_series_compaction_level_threshold != b._time_series_compaction_level_threshold) + return false; return true; } diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 4400be42af9..d21354366a9 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -112,7 +112,8 @@ public: int64_t time_series_compaction_time_threshold_seconds = 3600, int64_t time_series_compaction_empty_rowsets_threshold = 5, TInvertedIndexStorageFormat::type inverted_index_storage_format = - TInvertedIndexStorageFormat::V1); + TInvertedIndexStorageFormat::V1, + int64_t time_series_compaction_level_threshold = 1); // If need add a filed in TableMeta, filed init copy in copy construct function TabletMeta(const TabletMeta& tablet_meta); TabletMeta(TabletMeta&& tablet_meta) = delete; @@ -260,6 +261,12 @@ public: int64_t time_series_compaction_empty_rowsets_threshold() const { return _time_series_compaction_empty_rowsets_threshold; } + void set_time_series_compaction_level_threshold(int64_t level_threshold) { + _time_series_compaction_level_threshold = level_threshold; + } + int64_t time_series_compaction_level_threshold() const { + return _time_series_compaction_level_threshold; + } private: Status _save_meta(DataDir* data_dir); @@ -313,6 +320,7 @@ private: int64_t _time_series_compaction_file_count_threshold = 0; int64_t _time_series_compaction_time_threshold_seconds = 0; int64_t _time_series_compaction_empty_rowsets_threshold = 0; + int64_t _time_series_compaction_level_threshold = 0; mutable std::shared_mutex _meta_lock; }; diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md index 44dbf5cceb8..0bf9590ebf1 100644 --- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md +++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md @@ -461,6 +461,15 @@ Set table properties. The following attributes are currently supported: `"time_series_compaction_time_threshold_seconds" = "3600"` +* `time_series_compaction_level_threshold` + + When time series compaction policy is applied, This parameter defaults to 1. When set to 2, it is used to control the re-merging of segments that have been + + merged once, ensuring that the segment size reaches the time_series_compaction_goal_size_mbytes, which can achieve the effect of reducing the number of + + segments. + + `"time_series_compaction_level_threshold" = "2"` * Dynamic partition related diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md index e8ac8e69f0a..657adea881d 100644 --- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md +++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md @@ -444,6 +444,14 @@ UNIQUE KEY(k1, k2) `"time_series_compaction_time_threshold_seconds" = "3600"` +* `time_series_compaction_level_threshold` + + compaction 的合并策略为 time_series 时,此参数默认为1,当设置为2时用来控住对于合并过一次的段再合并一层,保证段大小达到time_series_compaction_goal_size_mbytes, + + 能达到段数量减少的效果。 + + `"time_series_compaction_level_threshold" = "2"` + * 动态分区相关 动态分区相关参数如下: diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java index 63759885294..1d408b6799f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java @@ -509,7 +509,9 @@ public class Alter { || properties .containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)); + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)); ((SchemaChangeHandler) schemaChangeHandler).updateTableProperties(db, tableName, properties); } else { throw new DdlException("Invalid alter operation: " + alterClause.getOpType()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index f60a47632ab..61e4496a358 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -276,6 +276,7 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable { tbl.getTimeSeriesCompactionFileCountThreshold(), tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), binlogConfig); createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 55c939a4669..b4d032dc78c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2234,6 +2234,13 @@ public class SchemaChangeHandler extends AlterHandler { .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD))); } + if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { + timeSeriesCompactionConfig + .put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, + Long.parseLong(properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))); + } + if (isInMemory < 0 && storagePolicyId < 0 && compactionPolicy == null && timeSeriesCompactionConfig.isEmpty() && !properties.containsKey(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED) && !properties.containsKey(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION) diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 4a8588a0ff2..011cc53979f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -281,6 +281,7 @@ public class SchemaChangeJobV2 extends AlterJobV2 { tbl.getTimeSeriesCompactionFileCountThreshold(), tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java index cc4dede8f12..c2bc7bc7d0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyTablePropertiesClause.java @@ -222,6 +222,23 @@ public class ModifyTablePropertiesClause extends AlterTableClause { } this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { + long levelThreshold; + String levelThresholdStr = properties + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD); + try { + levelThreshold = Long.parseLong(levelThresholdStr); + if (levelThreshold < 1 || levelThreshold > 2) { + throw new AnalysisException( + "time_series_compaction_level_threshold can not be less than 1 or greater than 2:" + + levelThresholdStr); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_level_threshold format: " + + levelThresholdStr); + } + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD)) { if (!properties.get(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD).equalsIgnoreCase("true") && !properties.get(PropertyAnalyzer diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java index d4470a1029e..2e6bbb162ac 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java @@ -1084,6 +1084,7 @@ public class RestoreJob extends AbstractJob { localTbl.getTimeSeriesCompactionFileCountThreshold(), localTbl.getTimeSeriesCompactionTimeThresholdSeconds(), localTbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + localTbl.getTimeSeriesCompactionLevelThreshold(), localTbl.storeRowColumn(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 5bf2f3eb1b5..632597bf6df 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3445,6 +3445,14 @@ public class Env { sb.append(olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()).append("\""); } + // time series compaction level threshold + if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy() + .equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) { + sb.append(",\n\"").append(PropertyAnalyzer + .PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD).append("\" = \""); + sb.append(olapTable.getTimeSeriesCompactionLevelThreshold()).append("\""); + } + // disable auto compaction sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).append("\" = \""); sb.append(olapTable.disableAutoCompaction()).append("\""); @@ -4874,7 +4882,8 @@ public class Env { .buildSkipWriteIndexOnLoad() .buildDisableAutoCompaction() .buildEnableSingleReplicaCompaction() - .buildTimeSeriesCompactionEmptyRowsetsThreshold(); + .buildTimeSeriesCompactionEmptyRowsetsThreshold() + .buildTimeSeriesCompactionLevelThreshold(); // need to update partition info meta for (Partition partition : table.getPartitions()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 0badc90fd3f..78e463a8a23 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -2162,6 +2162,20 @@ public class OlapTable extends Table implements MTMVRelatedTableIf { return null; } + public void setTimeSeriesCompactionLevelThreshold(long timeSeriesCompactionLevelThreshold) { + TableProperty tableProperty = getOrCreatTableProperty(); + tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, + Long.valueOf(timeSeriesCompactionLevelThreshold).toString()); + tableProperty.buildTimeSeriesCompactionLevelThreshold(); + } + + public Long getTimeSeriesCompactionLevelThreshold() { + if (tableProperty != null) { + return tableProperty.timeSeriesCompactionLevelThreshold(); + } + return null; + } + public int getBaseSchemaVersion() { MaterializedIndexMeta baseIndexMeta = indexIdToMeta.get(baseIndexId); return baseIndexMeta.getSchemaVersion(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java index dd3dd165d77..b41e8b4c37b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java @@ -106,6 +106,9 @@ public class TableProperty implements Writable { private long timeSeriesCompactionEmptyRowsetsThreshold = PropertyAnalyzer.TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE; + private long timeSeriesCompactionLevelThreshold + = PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; + private DataSortInfo dataSortInfo = new DataSortInfo(); public TableProperty(Map<String, String> properties) { @@ -143,6 +146,7 @@ public class TableProperty implements Writable { buildEnableSingleReplicaCompaction(); buildDisableAutoCompaction(); buildTimeSeriesCompactionEmptyRowsetsThreshold(); + buildTimeSeriesCompactionLevelThreshold(); break; default: break; @@ -300,6 +304,17 @@ public class TableProperty implements Writable { return timeSeriesCompactionEmptyRowsetsThreshold; } + public TableProperty buildTimeSeriesCompactionLevelThreshold() { + timeSeriesCompactionLevelThreshold = Long.parseLong(properties + .getOrDefault(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, + String.valueOf(PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE))); + return this; + } + + public long timeSeriesCompactionLevelThreshold() { + return timeSeriesCompactionLevelThreshold; + } + public TableProperty buildMinLoadReplicaNum() { minLoadReplicaNum = Short.parseShort( properties.getOrDefault(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM, "-1")); @@ -587,7 +602,8 @@ public class TableProperty implements Writable { .buildTimeSeriesCompactionTimeThresholdSeconds() .buildDisableAutoCompaction() .buildEnableSingleReplicaCompaction() - .buildTimeSeriesCompactionEmptyRowsetsThreshold(); + .buildTimeSeriesCompactionEmptyRowsetsThreshold() + .buildTimeSeriesCompactionLevelThreshold(); if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_105) { // get replica num from property map and create replica allocation String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM); diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java index 9bacb50db32..9737d95e2e6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java @@ -151,6 +151,9 @@ public class PropertyAnalyzer { public static final String PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD = "time_series_compaction_empty_rowsets_threshold"; + public static final String PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD = + "time_series_compaction_level_threshold"; + public static final String PROPERTIES_MUTABLE = "mutable"; public static final String PROPERTIES_IS_BEING_SYNCED = "is_being_synced"; @@ -198,7 +201,7 @@ public class PropertyAnalyzer { public static final long TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE = 2000; public static final long TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS_DEFAULT_VALUE = 3600; public static final long TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD_DEFAULT_VALUE = 5; - + public static final long TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE = 1; /** * check and replace members of DataProperty by properties. @@ -751,6 +754,30 @@ public class PropertyAnalyzer { return emptyRowsetsThreshold; } + public static long analyzeTimeSeriesCompactionLevelThreshold(Map<String, String> properties) + throws AnalysisException { + long levelThreshold = TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; + if (properties == null || properties.isEmpty()) { + return levelThreshold; + } + if (properties.containsKey(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { + String levelThresholdStr = properties + .get(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD); + properties.remove(PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD); + try { + levelThreshold = Long.parseLong(levelThresholdStr); + if (levelThreshold < 1 || levelThreshold > 2) { + throw new AnalysisException("time_series_compaction_level_threshold can not" + + " less than 1 or greater than 2: " + levelThreshold); + } + } catch (NumberFormatException e) { + throw new AnalysisException("Invalid time_series_compaction_level_threshold: " + + levelThreshold); + } + } + return levelThreshold; + } + public static long analyzeTimeSeriesCompactionFileCountThreshold(Map<String, String> properties) throws AnalysisException { long fileCountThreshold = TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD_DEFAULT_VALUE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index b6b89c0ad7d..4e306a246c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1482,6 +1482,10 @@ public class InternalCatalog implements CatalogIf<Database> { properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD, olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold().toString()); } + if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { + properties.put(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD, + olapTable.getTimeSeriesCompactionLevelThreshold().toString()); + } if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY)) { properties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, olapTable.getStoragePolicy()); } @@ -1909,6 +1913,7 @@ public class InternalCatalog implements CatalogIf<Database> { tbl.getTimeSeriesCompactionFileCountThreshold(), tbl.getTimeSeriesCompactionTimeThresholdSeconds(), tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(), + tbl.getTimeSeriesCompactionLevelThreshold(), tbl.storeRowColumn(), binlogConfig); task.setStorageFormat(tbl.getStorageFormat()); @@ -2176,7 +2181,9 @@ public class InternalCatalog implements CatalogIf<Database> { || properties .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS) || properties - .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD))) { + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD) + || properties + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD))) { throw new DdlException("only time series compaction policy support for time series config"); } @@ -2224,6 +2231,17 @@ public class InternalCatalog implements CatalogIf<Database> { } olapTable.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold); + // set time series compaction level threshold + long timeSeriesCompactionLevelThreshold + = PropertyAnalyzer.TIME_SERIES_COMPACTION_LEVEL_THRESHOLD_DEFAULT_VALUE; + try { + timeSeriesCompactionLevelThreshold = PropertyAnalyzer + .analyzeTimeSeriesCompactionLevelThreshold(properties); + } catch (AnalysisException e) { + throw new DdlException(e.getMessage()); + } + olapTable.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold); + // get storage format TStorageFormat storageFormat = TStorageFormat.V2; // default is segment v2 try { diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 6950dc83e9a..6b24bea1ed5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -849,6 +849,7 @@ public class ReportHandler extends Daemon { olapTable.getTimeSeriesCompactionFileCountThreshold(), olapTable.getTimeSeriesCompactionTimeThresholdSeconds(), olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold(), + olapTable.getTimeSeriesCompactionLevelThreshold(), olapTable.storeRowColumn(), binlogConfig); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java index 262ac8e84fe..7a5262ba0ed 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java @@ -116,6 +116,8 @@ public class CreateReplicaTask extends AgentTask { private long timeSeriesCompactionEmptyRowsetsThreshold; + private long timeSeriesCompactionLevelThreshold; + private boolean storeRowColumn; private BinlogConfig binlogConfig; @@ -140,6 +142,7 @@ public class CreateReplicaTask extends AgentTask { long timeSeriesCompactionFileCountThreshold, long timeSeriesCompactionTimeThresholdSeconds, long timeSeriesCompactionEmptyRowsetsThreshold, + long timeSeriesCompactionLevelThreshold, boolean storeRowColumn, BinlogConfig binlogConfig) { super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId); @@ -182,6 +185,7 @@ public class CreateReplicaTask extends AgentTask { this.timeSeriesCompactionFileCountThreshold = timeSeriesCompactionFileCountThreshold; this.timeSeriesCompactionTimeThresholdSeconds = timeSeriesCompactionTimeThresholdSeconds; this.timeSeriesCompactionEmptyRowsetsThreshold = timeSeriesCompactionEmptyRowsetsThreshold; + this.timeSeriesCompactionLevelThreshold = timeSeriesCompactionLevelThreshold; this.storeRowColumn = storeRowColumn; this.binlogConfig = binlogConfig; } @@ -343,6 +347,7 @@ public class CreateReplicaTask extends AgentTask { createTabletReq.setTimeSeriesCompactionFileCountThreshold(timeSeriesCompactionFileCountThreshold); createTabletReq.setTimeSeriesCompactionTimeThresholdSeconds(timeSeriesCompactionTimeThresholdSeconds); createTabletReq.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionEmptyRowsetsThreshold); + createTabletReq.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionLevelThreshold); if (binlogConfig != null) { createTabletReq.setBinlogConfig(binlogConfig.toThrift()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java index ad20b7b918d..7d4c6a3d022 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/UpdateTabletMetaInfoTask.java @@ -164,6 +164,11 @@ public class UpdateTabletMetaInfoTask extends AgentTask { metaInfo.setTimeSeriesCompactionEmptyRowsetsThreshold(timeSeriesCompactionConfig .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD)); } + if (timeSeriesCompactionConfig + .containsKey(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)) { + metaInfo.setTimeSeriesCompactionLevelThreshold(timeSeriesCompactionConfig + .get(PropertyAnalyzer.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD)); + } } if (enableSingleReplicaCompaction >= 0) { metaInfo.setEnableSingleReplicaCompaction(enableSingleReplicaCompaction > 0); diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java index 60ec442bedf..3dc4bc4695c 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java @@ -107,7 +107,7 @@ public class AgentTaskTest { createReplicaTask = new CreateReplicaTask(backendId1, dbId, tableId, partitionId, indexId1, tabletId1, replicaId1, shortKeyNum, schemaHash1, version, KeysType.AGG_KEYS, storageType, TStorageMedium.SSD, columns, null, 0, latch, null, false, TTabletType.TABLET_TYPE_DISK, null, - TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, false, null); + TCompressionType.LZ4F, false, "", false, false, false, "", 0, 0, 0, 0, 0, false, null); // drop dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1, false); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 5eedd8af079..bea90d86006 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -113,6 +113,7 @@ message RowsetMetaPB { reserved 50; // to indicate whether the data between the segments overlap optional SegmentsOverlapPB segments_overlap_pb = 51 [default = OVERLAP_UNKNOWN]; + optional int64 compaction_level = 52 [default = 0]; } message SegmentStatisticsPB { @@ -345,6 +346,7 @@ message TabletMetaPB { optional int64 time_series_compaction_file_count_threshold = 30 [default = 2000]; optional int64 time_series_compaction_time_threshold_seconds = 31 [default = 3600]; optional int64 time_series_compaction_empty_rowsets_threshold = 32 [default = 5]; + optional int64 time_series_compaction_level_threshold = 33 [default = 1]; } message OLAPRawDeltaHeaderMessage { diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 99196c2e024..645d38eed72 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -160,6 +160,7 @@ struct TCreateTabletReq { 25: optional i64 time_series_compaction_time_threshold_seconds = 3600 26: optional i64 time_series_compaction_empty_rowsets_threshold = 5 27: optional TInvertedIndexStorageFormat inverted_index_storage_format = TInvertedIndexStorageFormat.V1 + 28: optional i64 time_series_compaction_level_threshold = 1 } struct TDropTabletReq { @@ -430,6 +431,7 @@ struct TTabletMetaInfo { 15: optional bool skip_write_index_on_load 16: optional bool disable_auto_compaction 17: optional i64 time_series_compaction_empty_rowsets_threshold + 18: optional i64 time_series_compaction_level_threshold } struct TUpdateTabletMetaInfoReq { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org