This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 46d207e5f41 branch-3.0: [Fix](compaction) Should do_lease for full
compaction #47436 (#47519)
46d207e5f41 is described below
commit 46d207e5f41d6a82b50ae8c62f57659d303eb09f
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 6 14:42:30 2025 +0800
branch-3.0: [Fix](compaction) Should do_lease for full compaction #47436
(#47519)
Cherry-picked from #47436
Co-authored-by: bobhan1 <[email protected]>
---
.../cloud/cloud_cumulative_compaction_policy.cpp | 16 +++
be/src/cloud/cloud_full_compaction.cpp | 3 +
be/src/cloud/cloud_storage_engine.cpp | 9 ++
.../cloud/test_cloud_full_compaction_do_lease.out | Bin 0 -> 110 bytes
.../org/apache/doris/regression/suite/Suite.groovy | 18 +++
.../test_cloud_full_compaction_do_lease.groovy | 123 +++++++++++++++++++++
6 files changed, 169 insertions(+)
diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
index 6445b47fc59..9e3ca3eb3db 100644
--- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp
@@ -54,6 +54,22 @@ int64_t
CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
const int64_t max_compaction_score, const int64_t min_compaction_score,
std::vector<RowsetSharedPtr>* input_rowsets, Version*
last_delete_version,
size_t* compaction_score, bool allow_delete) {
+ DBUG_EXECUTE_IF(
+
"CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
{
+ auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
+ if (target_tablet_id == tablet->tablet_id()) {
+ auto start_version = dp->param<int64_t>("start_version",
-1);
+ auto end_version = dp->param<int64_t>("end_version", -1);
+ for (auto& rowset : candidate_rowsets) {
+ if (rowset->start_version() >= start_version &&
+ rowset->end_version() <= end_version) {
+ input_rowsets->push_back(rowset);
+ }
+ }
+ }
+ return input_rowsets->size();
+ })
+
size_t promotion_size = cloud_promotion_size(tablet);
auto max_version = tablet->max_version().first;
int transient_size = 0;
diff --git a/be/src/cloud/cloud_full_compaction.cpp
b/be/src/cloud/cloud_full_compaction.cpp
index bce00c9a2e7..f983e57ebe0 100644
--- a/be/src/cloud/cloud_full_compaction.cpp
+++ b/be/src/cloud/cloud_full_compaction.cpp
@@ -30,6 +30,7 @@
#include "olap/rowset/beta_rowset.h"
#include "olap/tablet_meta.h"
#include "service/backend_options.h"
+#include "util/debug_points.h"
#include "util/thread.h"
#include "util/uuid_generator.h"
#include "vec/columns/column.h"
@@ -221,6 +222,8 @@ Status CloudFullCompaction::modify_rowsets() {
compaction_job->set_index_size_output_rowsets(_output_rowset->index_disk_size());
compaction_job->set_segment_size_output_rowsets(_output_rowset->data_disk_size());
+ DBUG_EXECUTE_IF("CloudFullCompaction::modify_rowsets.block", DBUG_BLOCK);
+
DeleteBitmapPtr output_rowset_delete_bitmap = nullptr;
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index ee4f8b611a6..a8768554cff 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -749,6 +749,7 @@ Status CloudStorageEngine::submit_compaction_task(const
CloudTabletSPtr& tablet,
void CloudStorageEngine::_lease_compaction_thread_callback() {
while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::lease_compaction_interval_seconds))) {
+ std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions;
std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions;
std::vector<std::shared_ptr<CloudCumulativeCompaction>>
cumu_compactions;
{
@@ -763,8 +764,16 @@ void
CloudStorageEngine::_lease_compaction_thread_callback() {
cumu_compactions.push_back(cumu);
}
}
+ for (auto& [_, full] : _submitted_full_compactions) {
+ if (full) {
+ full_compactions.push_back(full);
+ }
+ }
}
// TODO(plat1ko): Support batch lease rpc
+ for (auto& comp : full_compactions) {
+ comp->do_lease();
+ }
for (auto& comp : cumu_compactions) {
comp->do_lease();
}
diff --git
a/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out
b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out
new file mode 100644
index 00000000000..6e498c12c60
Binary files /dev/null and
b/regression-test/data/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.out
differ
diff --git
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
index 4c6fdcb5e6b..47826ca639e 100644
---
a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
+++
b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy
@@ -1724,6 +1724,24 @@ class Suite implements GroovyInterceptable {
}
}
+ void setBeConfigTemporary(Map<String, Object> tempConfig, Closure
actionSupplier) {
+ Map<String, Map<String, String>> originConf = Maps.newHashMap()
+ tempConfig.each{ k, v ->
+ originConf.put(k, get_be_param(k))
+ }
+ try {
+ tempConfig.each{ k, v -> set_be_param(k, v)}
+ actionSupplier()
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ throw e
+ } finally {
+ originConf.each { k, confs ->
+ set_original_be_param(k, confs)
+ }
+ }
+ }
+
void waitAddFeFinished(String host, int port) {
logger.info("waiting for ${host}:${port}")
Awaitility.await().atMost(60, TimeUnit.SECONDS).with().pollDelay(100,
TimeUnit.MILLISECONDS).and()
diff --git
a/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy
new file mode 100644
index 00000000000..1910788d92f
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/cloud/test_cloud_full_compaction_do_lease.groovy
@@ -0,0 +1,123 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.junit.Assert
+
+suite("test_cloud_full_compaction_do_lease","nonConcurrent") {
+ if (!isCloudMode()) {
+ return
+ }
+
+ def tableName = "test_cloud_full_compaction_do_lease"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ CREATE TABLE ${tableName}
+ (k int, v1 int, v2 int )
+ UNIQUE KEY(k)
+ DISTRIBUTED BY HASH (k)
+ BUCKETS 1 PROPERTIES(
+ "replication_num" = "1",
+ "enable_unique_key_merge_on_write"="true",
+ "disable_auto_compaction" = "true");
+ """
+
+ (1..20).each{ id ->
+ sql """insert into ${tableName} select number, number, number from
numbers("number"="10");"""
+ }
+
+ qt_sql "select count(1) from ${tableName};"
+
+ def backends = sql_return_maparray('show backends')
+ def tabletStats = sql_return_maparray("show tablets from ${tableName};")
+ assert tabletStats.size() == 1
+ def tabletId = tabletStats[0].TabletId
+ def tabletBackendId = tabletStats[0].BackendId
+ def tabletBackend
+ for (def be : backends) {
+ if (be.BackendId == tabletBackendId) {
+ tabletBackend = be
+ break;
+ }
+ }
+ logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with
backendId=${tabletBackend.BackendId}");
+
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+
+ def customBeConfig = [
+ lease_compaction_interval_seconds : 2
+ ]
+
+ setBeConfigTemporary(customBeConfig) {
+ // the default value of lease_compaction_interval_seconds is 20s,
which means
+ // the compaction lease thread will sleep for 20s first, we sleep 20s
in case
+ // so that compaction lease thread can be scheduled as we expect(2s)
+ Thread.sleep(20000)
+ try {
+ // block the full compaction
+
GetDebugPoint().enableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id:"${tabletId}", start_version:"2",
end_version:"10"]);
+
+ {
+ // trigger full compaction, it will be blokced in
modify_rowsets
+ logger.info("trigger full compaction on BE
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+ def (code, out, err) =
be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ assert code == 0
+ def compactJson = parseJson(out.trim())
+ assert "success" == compactJson.status.toLowerCase()
+ }
+
+ // wait until the full compaction job's lease
timeout(lease_compaction_interval_seconds * 4)
+ Thread.sleep(10000);
+
+ {
+ // trigger cumu compaction
+ logger.info("trigger cumu compaction on BE
${tabletBackend.Host} with backendId=${tabletBackend.BackendId}")
+ def (code, out, err) =
be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort,
tabletId)
+ logger.info("Run compaction: code=" + code + ", out=" + out +
", err=" + err)
+ assert code == 0
+ def compactJson = parseJson(out.trim())
+ // this will fail due to existing full compaction
+ assert "e-2000" == compactJson.status.toLowerCase()
+ }
+
+ Thread.sleep(1000);
+
+ // unblock full compaction
+
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+
+ Thread.sleep(3000);
+
+ {
+ def (code, out, err) =
be_show_tablet_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId)
+ assert code == 0
+ def compactJson = parseJson(out.trim())
+ assert compactJson["rowsets"].toString().contains("[2-21]")
+ }
+
+
+ } catch (Exception e) {
+ logger.info(e.getMessage())
+ assert false
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("CloudFullCompaction::modify_rowsets.block")
+
GetDebugPoint().disableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets")
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]