This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0.5
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0.5 by this push:
new e5689768f87 [fix](compaction) fix time series compaction time
threshold (#50306) (#50391)
e5689768f87 is described below
commit e5689768f873d897d8635cbb7fcda649b4c53915
Author: Sun Chenyang <[email protected]>
AuthorDate: Fri Apr 25 10:17:47 2025 +0800
[fix](compaction) fix time series compaction time threshold (#50306)
(#50391)
pick from master #50306
---
.../cumulative_compaction_time_series_policy.cpp | 37 +++++++++++++---------
...mulative_compaction_time_series_policy_test.cpp | 11 +++++--
.../test_time_series_compaction_policy.groovy | 30 ++++++++++++++++++
3 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/be/src/olap/cumulative_compaction_time_series_policy.cpp
b/be/src/olap/cumulative_compaction_time_series_policy.cpp
index 64e51c77641..02af0a666f0 100644
--- a/be/src/olap/cumulative_compaction_time_series_policy.cpp
+++ b/be/src/olap/cumulative_compaction_time_series_policy.cpp
@@ -31,6 +31,7 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
bool base_rowset_exist = false;
const int64_t point = tablet->cumulative_layer_point();
+ int64_t earliest_rowset_creation_time = INT64_MAX;
int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
int64_t first_version = INT64_MAX;
@@ -59,6 +60,9 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
} else {
checked_rs_metas.push_back(rs_meta);
}
+ if (rs_meta->creation_time() < earliest_rowset_creation_time) {
+ earliest_rowset_creation_time = rs_meta->creation_time();
+ }
}
}
@@ -102,21 +106,17 @@ uint32_t
TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
}
}
}
-
- int64_t now = UnixMillis();
- int64_t last_cumu = tablet->last_cumu_compaction_success_time();
- if (last_cumu != 0) {
- int64_t cumu_interval = now - last_cumu;
+ // current time in seconds
+ int64_t now = time(nullptr);
+ if (earliest_rowset_creation_time < now) {
+ int64_t cumu_interval = now - earliest_rowset_creation_time;
// Condition 4: the time interval between compactions exceeds the
value specified by parameter _compaction_time_threshold_second
if (cumu_interval >
-
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
+
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() &&
+ score > 0) {
return score;
}
- } else if (score > 0) {
- // If the compaction process has not been successfully executed,
- // the condition for triggering compaction based on the last
successful compaction time (condition 3) will never be met
- tablet->set_last_cumu_compaction_success_time(now);
}
// Condition 5: If there is a continuous set of empty rowsets, prioritize
merging.
@@ -310,14 +310,21 @@ int
TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
}
}
- int64_t now = UnixMillis();
- int64_t last_cumu = tablet->last_cumu_compaction_success_time();
- if (last_cumu != 0) {
- int64_t cumu_interval = now - last_cumu;
+ // current time in seconds
+ int64_t now = time(nullptr);
+ if (!input_rowsets->empty()) {
+ LOG_EVERY_N(INFO, 1000)
+ << "tablet is: " << tablet->tablet_id() << ", now: " << now
+ << ", earliest rowset creation time: "
+ << input_rowsets->front()->rowset_meta()->creation_time()
+ << ", compaction_time_threshold_seconds: "
+ <<
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds()
+ << ", rowset count: " << transient_size;
+ int64_t cumu_interval = now -
input_rowsets->front()->rowset_meta()->creation_time();
// Condition 4: the time interval between compactions exceeds the
value specified by parameter compaction_time_threshold_second
if (cumu_interval >
-
(tablet->tablet_meta()->time_series_compaction_time_threshold_seconds() *
1000)) {
+
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds()) {
if
(tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2) {
if (input_rowsets->empty() && level1_rowsets.size() >= 2) {
input_rowsets->swap(level1_rowsets);
diff --git a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
index 01963d591be..694c5ad35bd 100644
--- a/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_time_series_policy_test.cpp
@@ -77,7 +77,7 @@ public:
json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb);
rowset_meta_pb.set_start_version(start);
rowset_meta_pb.set_end_version(end);
- rowset_meta_pb.set_creation_time(10000);
+ rowset_meta_pb.set_creation_time(time(nullptr));
pb1->init_from_pb(rowset_meta_pb);
pb1->set_total_disk_size(41);
@@ -536,8 +536,13 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy,
pick_input_rowsets_time_interva
new Tablet(_engine, _tablet_meta, nullptr,
CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
- int64_t now = UnixMillis();
- _tablet->set_last_cumu_compaction_success_time(now - 3700 * 1000);
+
+
_tablet->_tablet_meta->set_time_series_compaction_time_threshold_seconds(1);
+ std::this_thread::sleep_for(std::chrono::seconds(2));
+
+ int score =
+
_tablet->_cumulative_compaction_policy->calc_cumulative_compaction_score(_tablet.get());
+ EXPECT_EQ(3, score);
auto candidate_rowsets =
_tablet->pick_candidate_rowsets_to_cumulative_compaction();
diff --git
a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
index d211ba98bdd..69c8fb97fa7 100644
---
a/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
+++
b/regression-test/suites/compaction/test_time_series_compaction_policy.groovy
@@ -120,4 +120,34 @@ suite("test_time_series_compaction_polciy", "p0") {
rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)
qt_sql_3 """ select count() from ${tableName}"""
+
+ sql """ DROP TABLE IF EXISTS ${tableName}; """
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `hobbies` text NULL,
+ `score` int(11) NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true",
+ "compaction_policy" = "time_series",
+ "time_series_compaction_time_threshold_seconds" = "70"
+ );
+ """
+
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear",
99); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (100, "andy", "andy love apple",
100); """
+ sql """ INSERT INTO ${tableName} VALUES (100, "bason", "bason hate pear",
99); """
+
+ Thread.sleep(75000)
+ trigger_and_wait_compaction(tableName, "cumulative")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]