This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 69912ecaafd branch-3.0: [fix](mow) remove delete bitmap when remove
unused rowsets (#50973) (#51871)
69912ecaafd is described below
commit 69912ecaafde0a020d85fe67f51e2a109a6f9ae3
Author: meiyi <[email protected]>
AuthorDate: Thu Jun 19 16:44:02 2025 +0800
branch-3.0: [fix](mow) remove delete bitmap when remove unused rowsets
(#50973) (#51871)
pick https://github.com/apache/doris/pull/50973
---
be/src/cloud/cloud_cumulative_compaction.cpp | 4 +
be/src/cloud/cloud_delete_bitmap_action.cpp | 182 ---------------
be/src/cloud/cloud_tablet.cpp | 10 +-
be/src/http/action/delete_bitmap_action.cpp | 213 +++++++++++++++++
.../action/delete_bitmap_action.h} | 13 +-
be/src/olap/compaction.cpp | 2 +
be/src/olap/storage_engine.cpp | 35 ++-
be/src/olap/tablet.cpp | 2 +
be/src/olap/tablet_meta.cpp | 26 +--
be/src/olap/tablet_meta.h | 1 +
be/src/service/http_service.cpp | 21 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 2 +
.../test_mow_delete_unused_rowset_dm_docker.out | Bin 0 -> 219 bytes
.../test_schema_change_add_key_column.csv.gz | Bin 0 -> 72233 bytes
.../test_schema_change_add_key_column1.csv.gz | Bin 0 -> 177558 bytes
.../test_mow_compact_multi_segments.groovy | 253 +++++++++++++++++++++
.../test_mow_delete_unused_rowset_dm_docker.groovy | 231 +++++++++++++++++++
.../test_mow_stale_rowset_delete_bitmap.groovy | 249 ++++++++++++++++++++
18 files changed, 1021 insertions(+), 223 deletions(-)
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 097bdc9bf4a..8546ac04fcf 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -447,6 +447,10 @@ Status
CloudCumulativeCompaction::process_old_version_delete_bitmap() {
{
static_cast<CloudTablet*>(_tablet.get())->delete_expired_stale_rowsets(); });
}
}
+
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset",
{
+ LOG(INFO) << "delete_expired_stale_rowsets for tablet=" <<
_tablet->tablet_id();
+ _engine.tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1));
+ });
return Status::OK();
}
diff --git a/be/src/cloud/cloud_delete_bitmap_action.cpp
b/be/src/cloud/cloud_delete_bitmap_action.cpp
deleted file mode 100644
index 3d834bfe7b3..00000000000
--- a/be/src/cloud/cloud_delete_bitmap_action.cpp
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "cloud_delete_bitmap_action.h"
-
-#include <rapidjson/document.h>
-#include <rapidjson/encodings.h>
-#include <rapidjson/prettywriter.h>
-#include <rapidjson/rapidjson.h>
-#include <rapidjson/stringbuffer.h>
-
-#include <chrono> // IWYU pragma: keep
-#include <exception>
-#include <future>
-#include <memory>
-#include <mutex>
-#include <sstream>
-#include <string>
-#include <thread>
-#include <utility>
-
-#include "cloud/cloud_meta_mgr.h"
-#include "cloud/cloud_tablet.h"
-#include "cloud/cloud_tablet_mgr.h"
-#include "common/logging.h"
-#include "common/status.h"
-#include "gutil/strings/substitute.h"
-#include "http/http_channel.h"
-#include "http/http_headers.h"
-#include "http/http_request.h"
-#include "http/http_status.h"
-#include "olap/olap_define.h"
-#include "olap/storage_engine.h"
-#include "olap/tablet_manager.h"
-#include "util/doris_metrics.h"
-#include "util/stopwatch.hpp"
-
-namespace doris {
-#include "common/compile_check_begin.h"
-using namespace ErrorCode;
-
-namespace {
-
-constexpr std::string_view HEADER_JSON = "application/json";
-
-} // namespace
-
-CloudDeleteBitmapAction::CloudDeleteBitmapAction(DeleteBitmapActionType ctype,
ExecEnv* exec_env,
- CloudStorageEngine& engine,
- TPrivilegeHier::type hier,
- TPrivilegeType::type ptype)
- : HttpHandlerWithAuth(exec_env, hier, ptype),
- _engine(engine),
- _delete_bitmap_action_type(ctype) {}
-
-static Status _check_param(HttpRequest* req, uint64_t* tablet_id) {
- const auto& req_tablet_id = req->param(TABLET_ID_KEY);
- if (req_tablet_id.empty()) {
- return Status::InternalError("tablet id is empty!");
- }
- try {
- *tablet_id = std::stoull(req_tablet_id);
- } catch (const std::exception& e) {
- return Status::InternalError("convert tablet_id failed, {}", e.what());
- }
- return Status::OK();
-}
-
-Status
CloudDeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest*
req,
-
std::string* json_result) {
- uint64_t tablet_id = 0;
- // check & retrieve tablet_id from req if it contains
- RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param
failed");
- if (tablet_id == 0) {
- return Status::InternalError("check param failed: missing tablet_id");
- }
-
- CloudTabletSPtr tablet =
DORIS_TRY(_engine.tablet_mgr().get_tablet(tablet_id));
- if (tablet == nullptr) {
- return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
- }
-
- auto count =
tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
- auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
- auto size = tablet->tablet_meta()->delete_bitmap().get_size();
- LOG(INFO) << "show_local_delete_bitmap_count,tablet_id=" << tablet_id <<
",count=" << count
- << ",cardinality=" << cardinality << ",size=" << size;
-
- rapidjson::Document root;
- root.SetObject();
- root.AddMember("delete_bitmap_count", count, root.GetAllocator());
- root.AddMember("cardinality", cardinality, root.GetAllocator());
- root.AddMember("size", size, root.GetAllocator());
-
- // to json string
- rapidjson::StringBuffer strbuf;
- rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
- root.Accept(writer);
- *json_result = std::string(strbuf.GetString());
-
- return Status::OK();
-}
-
-Status
CloudDeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
-
std::string* json_result) {
- uint64_t tablet_id = 0;
- // check & retrieve tablet_id from req if it contains
- RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id), "check param
failed");
- if (tablet_id == 0) {
- return Status::InternalError("check param failed: missing tablet_id");
- }
- TabletMetaSharedPtr tablet_meta;
- auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
- if (!st.ok()) {
- LOG(WARNING) << "failed to get_tablet_meta tablet=" << tablet_id
- << ", st=" << st.to_string();
- return st;
- }
- auto tablet = std::make_shared<CloudTablet>(_engine,
std::move(tablet_meta));
- st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), false, true,
true);
- if (!st.ok()) {
- LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
- return st;
- }
- auto count =
tablet->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
- auto cardinality = tablet->tablet_meta()->delete_bitmap().cardinality();
- auto size = tablet->tablet_meta()->delete_bitmap().get_size();
- LOG(INFO) << "show_ms_delete_bitmap_count,tablet_id=" << tablet_id <<
",count=" << count
- << ",cardinality=" << cardinality << ",size=" << size;
-
- rapidjson::Document root;
- root.SetObject();
- root.AddMember("delete_bitmap_count", count, root.GetAllocator());
- root.AddMember("cardinality", cardinality, root.GetAllocator());
- root.AddMember("size", size, root.GetAllocator());
-
- // to json string
- rapidjson::StringBuffer strbuf;
- rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
- root.Accept(writer);
- *json_result = std::string(strbuf.GetString());
-
- return Status::OK();
-}
-
-void CloudDeleteBitmapAction::handle(HttpRequest* req) {
- req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
- if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
- std::string json_result;
- Status st = _handle_show_local_delete_bitmap_count(req, &json_result);
- if (!st.ok()) {
- HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
- } else {
- HttpChannel::send_reply(req, HttpStatus::OK, json_result);
- }
- } else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS)
{
- std::string json_result;
- Status st = _handle_show_ms_delete_bitmap_count(req, &json_result);
- if (!st.ok()) {
- HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
- } else {
- HttpChannel::send_reply(req, HttpStatus::OK, json_result);
- }
- }
-}
-
-#include "common/compile_check_end.h"
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index c044b8361b7..b4cf9d2e31c 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -493,18 +493,16 @@ void CloudTablet::remove_unused_rowsets() {
++it;
continue;
}
+ tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(),
rs->version());
rs->clear_cache();
it = _unused_rowsets.erase(it);
g_unused_rowsets_count << -1;
removed_rowsets_num++;
}
- if (removed_rowsets_num > 0) {
- LOG(INFO) << "tablet_id=" << tablet_id()
- << ", unused_rowset size=" << _unused_rowsets.size()
- << ", removed_rowsets_num=" << removed_rowsets_num
- << ", cost(us)=" << watch.get_elapse_time_us();
- }
+ LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" <<
_unused_rowsets.size()
+ << ", removed_rowsets_num=" << removed_rowsets_num
+ << ", cost(us)=" << watch.get_elapse_time_us();
}
void CloudTablet::update_base_size(const Rowset& rs) {
diff --git a/be/src/http/action/delete_bitmap_action.cpp
b/be/src/http/action/delete_bitmap_action.cpp
new file mode 100644
index 00000000000..59783d1c055
--- /dev/null
+++ b/be/src/http/action/delete_bitmap_action.cpp
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "delete_bitmap_action.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/encodings.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+
+#include <exception>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "cloud/cloud_tablet.h"
+#include "cloud/cloud_tablet_mgr.h"
+#include "cloud/config.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/olap_define.h"
+#include "olap/tablet_manager.h"
+
+namespace doris {
+#include "common/compile_check_begin.h"
+using namespace ErrorCode;
+
+namespace {
+
+constexpr std::string_view HEADER_JSON = "application/json";
+
+} // namespace
+
+DeleteBitmapAction::DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv*
exec_env,
+ BaseStorageEngine& engine,
TPrivilegeHier::type hier,
+ TPrivilegeType::type ptype)
+ : HttpHandlerWithAuth(exec_env, hier, ptype),
+ _engine(engine),
+ _delete_bitmap_action_type(ctype) {}
+
+static Status _check_param(HttpRequest* req, uint64_t* tablet_id, bool*
verbose) {
+ const auto& req_tablet_id = req->param(TABLET_ID_KEY);
+ if (req_tablet_id.empty()) {
+ return Status::InternalError<false>("tablet id is empty!");
+ }
+ try {
+ *tablet_id = std::stoull(req_tablet_id);
+ } catch (const std::exception& e) {
+ return Status::InternalError<false>("convert tablet_id failed, {}",
e.what());
+ }
+ if (*tablet_id == 0) {
+ return Status::InternalError<false>("check param failed: invalid
tablet_id");
+ }
+ *verbose = iequal(req->param("verbose"), "true");
+ return Status::OK();
+}
+
+static void _show_delete_bitmap(DeleteBitmap& dm, bool verbose, std::string*
json_result) {
+ auto count = dm.get_delete_bitmap_count();
+ auto cardinality = dm.cardinality();
+ auto size = dm.get_size();
+ rapidjson::Document root;
+ root.SetObject();
+ root.AddMember("delete_bitmap_count", count, root.GetAllocator());
+ root.AddMember("cardinality", cardinality, root.GetAllocator());
+ root.AddMember("size", size, root.GetAllocator());
+ if (verbose) {
+ std::string pre_rowset_id = "";
+ int64_t pre_segment_id = -1;
+ std::vector<std::string> version_vector;
+ rapidjson::Document dm_arr;
+ dm_arr.SetObject();
+
+ auto add_rowset_delete_bitmap_info = [&]() {
+ std::string key =
+ "rowset: " + pre_rowset_id + ", segment: " +
std::to_string(pre_segment_id);
+ rapidjson::Value key_value;
+ key_value.SetString(key.data(),
static_cast<uint32_t>(key.length()),
+ root.GetAllocator());
+ rapidjson::Document version_arr;
+ version_arr.SetArray();
+ for (const auto& str : version_vector) {
+ rapidjson::Value value;
+ value.SetString(str.c_str(),
static_cast<uint32_t>(str.length()),
+ root.GetAllocator());
+ version_arr.PushBack(value, root.GetAllocator());
+ }
+ dm_arr.AddMember(key_value, version_arr, root.GetAllocator());
+ version_vector.clear();
+ };
+
+ for (auto& [id, bitmap] : dm.delete_bitmap) {
+ auto& [rowset_id, segment_id, version] = id;
+ if (rowset_id.to_string() != pre_rowset_id || segment_id !=
pre_segment_id) {
+ // add previous result
+ if (!pre_rowset_id.empty()) {
+ add_rowset_delete_bitmap_info();
+ }
+ pre_rowset_id = rowset_id.to_string();
+ pre_segment_id = segment_id;
+ }
+ std::string str = fmt::format("v: {}, c: {}, s: {}", version,
bitmap.cardinality(),
+ bitmap.getSizeInBytes());
+ version_vector.push_back(str);
+ }
+ // add last result
+ if (!version_vector.empty()) {
+ add_rowset_delete_bitmap_info();
+ }
+ root.AddMember("delete_bitmap", dm_arr, root.GetAllocator());
+ }
+
+ // to json string
+ rapidjson::StringBuffer strbuf;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+ root.Accept(writer);
+ *json_result = std::string(strbuf.GetString());
+}
+
+Status DeleteBitmapAction::_handle_show_local_delete_bitmap_count(HttpRequest*
req,
+ std::string*
json_result) {
+ uint64_t tablet_id = 0;
+ bool verbose = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &verbose),
"check param failed");
+
+ BaseTabletSPtr tablet = nullptr;
+ if (config::is_cloud_mode()) {
+ tablet =
DORIS_TRY(_engine.to_cloud().tablet_mgr().get_tablet(tablet_id));
+ DBUG_EXECUTE_IF(
+
"DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets",
+ {
_engine.to_cloud().tablet_mgr().vacuum_stale_rowsets(CountDownLatch(1)); });
+ } else {
+ tablet = _engine.to_local().tablet_manager()->get_tablet(tablet_id);
+ DBUG_EXECUTE_IF(
+
"DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_"
+ "rowset",
+ { _engine.to_local().start_delete_unused_rowset(); });
+ }
+ if (tablet == nullptr) {
+ return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
+ }
+ auto dm = tablet->tablet_meta()->delete_bitmap().snapshot();
+ _show_delete_bitmap(dm, verbose, json_result);
+ return Status::OK();
+}
+
+Status DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest*
req,
+ std::string*
json_result) {
+ uint64_t tablet_id = 0;
+ bool verbose = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(_check_param(req, &tablet_id, &verbose),
"check param failed");
+
+ TabletMetaSharedPtr tablet_meta;
+ auto st = _engine.to_cloud().meta_mgr().get_tablet_meta(tablet_id,
&tablet_meta);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to get_tablet_meta for tablet=" << tablet_id
+ << ", st=" << st.to_string();
+ return st;
+ }
+ auto tablet = std::make_shared<CloudTablet>(_engine.to_cloud(),
std::move(tablet_meta));
+ st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(),
false, true, true);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
+ return st;
+ }
+ auto dm = tablet->tablet_meta()->delete_bitmap().snapshot();
+ _show_delete_bitmap(dm, verbose, json_result);
+ return Status::OK();
+}
+
+void DeleteBitmapAction::handle(HttpRequest* req) {
+ req->add_output_header(HttpHeaders::CONTENT_TYPE, HEADER_JSON.data());
+ if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_LOCAL) {
+ std::string json_result;
+ Status st = _handle_show_local_delete_bitmap_count(req, &json_result);
+ if (!st.ok()) {
+ HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
+ } else {
+ HttpChannel::send_reply(req, HttpStatus::OK, json_result);
+ }
+ } else if (_delete_bitmap_action_type == DeleteBitmapActionType::COUNT_MS)
{
+ std::string json_result;
+ Status st = _handle_show_ms_delete_bitmap_count(req, &json_result);
+ if (!st.ok()) {
+ HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
+ } else {
+ HttpChannel::send_reply(req, HttpStatus::OK, json_result);
+ }
+ }
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/cloud/cloud_delete_bitmap_action.h
b/be/src/http/action/delete_bitmap_action.h
similarity index 78%
rename from be/src/cloud/cloud_delete_bitmap_action.h
rename to be/src/http/action/delete_bitmap_action.h
index ce507ee9991..284e8dbcf57 100644
--- a/be/src/cloud/cloud_delete_bitmap_action.h
+++ b/be/src/http/action/delete_bitmap_action.h
@@ -21,9 +21,9 @@
#include <string>
-#include "cloud/cloud_storage_engine.h"
#include "common/status.h"
#include "http/http_handler_with_auth.h"
+#include "olap/storage_engine.h"
#include "olap/tablet.h"
namespace doris {
@@ -35,13 +35,12 @@ class ExecEnv;
enum class DeleteBitmapActionType { COUNT_LOCAL = 1, COUNT_MS = 2 };
/// This action is used for viewing the delete bitmap status
-class CloudDeleteBitmapAction : public HttpHandlerWithAuth {
+class DeleteBitmapAction : public HttpHandlerWithAuth {
public:
- CloudDeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
- CloudStorageEngine& engine, TPrivilegeHier::type
hier,
- TPrivilegeType::type ptype);
+ DeleteBitmapAction(DeleteBitmapActionType ctype, ExecEnv* exec_env,
BaseStorageEngine& engine,
+ TPrivilegeHier::type hier, TPrivilegeType::type ptype);
- ~CloudDeleteBitmapAction() override = default;
+ ~DeleteBitmapAction() override = default;
void handle(HttpRequest* req) override;
@@ -50,7 +49,7 @@ private:
Status _handle_show_ms_delete_bitmap_count(HttpRequest* req, std::string*
json_result);
private:
- CloudStorageEngine& _engine;
+ BaseStorageEngine& _engine;
DeleteBitmapActionType _delete_bitmap_action_type;
};
#include "common/compile_check_end.h"
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 1f21795e1c9..dccf56e9c9b 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -1231,6 +1231,8 @@ Status CompactionMixin::modify_rowsets() {
LOG(WARNING) << "failed to remove old version delete bitmap, st: "
<< st;
}
}
+
DBUG_EXECUTE_IF("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset",
+ { tablet()->delete_expired_stale_rowset(); });
return Status::OK();
}
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 7188402e956..02234b326aa 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -944,15 +944,28 @@ void StorageEngine::_clean_unused_rowset_metas() {
for (auto data_dir : data_dirs) {
static_cast<void>(
RowsetMetaManager::traverse_rowset_metas(data_dir->get_meta(),
clean_rowset_func));
+ // 1. delete delete_bitmap
+ std::set<int64_t> tablets_to_save_meta;
for (auto& rowset_meta : invalid_rowset_metas) {
- static_cast<void>(RowsetMetaManager::remove(
- data_dir->get_meta(), rowset_meta->tablet_uid(),
rowset_meta->rowset_id()));
TabletSharedPtr tablet =
_tablet_manager->get_tablet(rowset_meta->tablet_id());
if (tablet &&
tablet->tablet_meta()->enable_unique_key_merge_on_write()) {
-
tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version(
- rowset_meta->rowset_id());
+
tablet->tablet_meta()->remove_rowset_delete_bitmap(rowset_meta->rowset_id(),
+
rowset_meta->version());
+ tablets_to_save_meta.emplace(tablet->tablet_id());
}
}
+ for (const auto& tablet_id : tablets_to_save_meta) {
+ auto tablet = _tablet_manager->get_tablet(tablet_id);
+ if (tablet) {
+ std::shared_lock rlock(tablet->get_header_lock());
+ tablet->save_meta();
+ }
+ }
+ // 2. delete rowset meta
+ for (auto& rowset_meta : invalid_rowset_metas) {
+ static_cast<void>(RowsetMetaManager::remove(
+ data_dir->get_meta(), rowset_meta->tablet_uid(),
rowset_meta->rowset_id()));
+ }
LOG(INFO) << "remove " << invalid_rowset_metas.size()
<< " invalid rowset meta from dir: " << data_dir->path();
invalid_rowset_metas.clear();
@@ -1186,6 +1199,7 @@ void StorageEngine::_parse_default_rowset_type() {
}
void StorageEngine::start_delete_unused_rowset() {
+ DBUG_EXECUTE_IF("StorageEngine::start_delete_unused_rowset.block",
DBUG_BLOCK);
LOG(INFO) << "start to delete unused rowset, size: " <<
_unused_rowsets.size();
std::vector<RowsetSharedPtr> unused_rowsets_copy;
unused_rowsets_copy.reserve(_unused_rowsets.size());
@@ -1218,20 +1232,27 @@ void StorageEngine::start_delete_unused_rowset() {
<< due_to_use_count << " rowsets due to use count > 1, skipped "
<< due_to_not_delete_file << " rowsets due to don't need to
delete file, skipped "
<< due_to_delayed_expired_ts << " rowsets due to delayed expired
timestamp.";
+ std::set<int64_t> tablets_to_save_meta;
for (auto&& rs : unused_rowsets_copy) {
VLOG_NOTICE << "start to remove rowset:" << rs->rowset_id()
<< ", version:" << rs->version();
// delete delete_bitmap of unused rowsets
if (auto tablet =
_tablet_manager->get_tablet(rs->rowset_meta()->tablet_id());
tablet && tablet->enable_unique_key_merge_on_write()) {
- tablet->tablet_meta()->delete_bitmap().remove({rs->rowset_id(), 0,
0},
- {rs->rowset_id(),
UINT32_MAX, 0});
-
tablet->tablet_meta()->delete_bitmap().remove_rowset_cache_version(rs->rowset_id());
+
tablet->tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(),
rs->version());
+ tablets_to_save_meta.emplace(tablet->tablet_id());
}
Status status = rs->remove();
unused_rowsets_counter << -1;
VLOG_NOTICE << "remove rowset:" << rs->rowset_id() << " finished.
status:" << status;
}
+ for (const auto& tablet_id : tablets_to_save_meta) {
+ auto tablet = _tablet_manager->get_tablet(tablet_id);
+ if (tablet) {
+ std::shared_lock rlock(tablet->get_header_lock());
+ tablet->save_meta();
+ }
+ }
LOG(INFO) << "removed all collected unused rowsets";
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 9ff7ec74eda..5c4770e3a33 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -861,6 +861,8 @@ void Tablet::delete_expired_stale_rowset() {
if (config::enable_mow_verbose_log) {
LOG_INFO("finish delete_expired_stale_rowset for tablet={}",
tablet_id());
}
+
DBUG_EXECUTE_IF("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset",
+ { _engine.start_delete_unused_rowset(); });
}
Status Tablet::capture_consistent_versions_unlocked(const Version&
spec_version,
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 5516196641d..3a0ff3419ee 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -453,6 +453,18 @@ void TabletMeta::init_column_from_tcolumn(uint32_t
unique_id, const TColumn& tco
}
}
+void TabletMeta::remove_rowset_delete_bitmap(const RowsetId& rowset_id, const
Version& version) {
+ if (_enable_unique_key_merge_on_write) {
+ delete_bitmap().remove({rowset_id, 0, 0}, {rowset_id, UINT32_MAX, 0});
+ if (config::enable_mow_verbose_log) {
+ LOG_INFO("delete rowset delete bitmap. tablet={}, rowset={},
version={}", tablet_id(),
+ rowset_id.to_string(), version.to_string());
+ }
+ size_t rowset_cache_version_size =
delete_bitmap().remove_rowset_cache_version(rowset_id);
+ _check_mow_rowset_cache_version_size(rowset_cache_version_size);
+ }
+}
+
Status TabletMeta::create_from_file(const string& file_path) {
TabletMetaPB tablet_meta_pb;
RETURN_IF_ERROR(load_from_file(file_path, &tablet_meta_pb));
@@ -943,28 +955,14 @@ void TabletMeta::revise_delete_bitmap_unlocked(const
DeleteBitmap& delete_bitmap
}
void TabletMeta::delete_stale_rs_meta_by_version(const Version& version) {
- size_t rowset_cache_version_size = 0;
auto it = _stale_rs_metas.begin();
while (it != _stale_rs_metas.end()) {
if ((*it)->version() == version) {
- if (_enable_unique_key_merge_on_write) {
- // remove rowset delete bitmap
- delete_bitmap().remove({(*it)->rowset_id(), 0, 0},
- {(*it)->rowset_id(), UINT32_MAX, 0});
- rowset_cache_version_size =
-
delete_bitmap().remove_rowset_cache_version((*it)->rowset_id());
- if (config::enable_mow_verbose_log) {
- LOG_INFO(
- "delete stale rowset's delete bitmap. tablet={},
version={}, rowset={}",
- tablet_id(), version.to_string(),
(*it)->rowset_id().to_string());
- }
- }
it = _stale_rs_metas.erase(it);
} else {
it++;
}
}
- _check_mow_rowset_cache_version_size(rowset_cache_version_size);
}
RowsetMetaSharedPtr TabletMeta::acquire_rs_meta_by_version(const Version&
version) const {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index a0a2ab3d321..388ddc439dc 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -242,6 +242,7 @@ public:
ColumnPB* column);
DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
+ void remove_rowset_delete_bitmap(const RowsetId& rowset_id, const Version&
version);
bool enable_unique_key_merge_on_write() const { return
_enable_unique_key_merge_on_write; }
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index 43506800c43..32ea86b09b1 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -25,7 +25,6 @@
#include <vector>
#include "cloud/cloud_compaction_action.h"
-#include "cloud/cloud_delete_bitmap_action.h"
#include "cloud/config.h"
#include "cloud/injection_point_action.h"
#include "common/config.h"
@@ -43,6 +42,7 @@
#include "http/action/compaction_score_action.h"
#include "http/action/config_action.h"
#include "http/action/debug_point_action.h"
+#include "http/action/delete_bitmap_action.h"
#include "http/action/download_action.h"
#include "http/action/download_binlog_action.h"
#include "http/action/file_cache_action.h"
@@ -384,6 +384,13 @@ void HttpService::register_local_handler(StorageEngine&
engine) {
_ev_http_server->register_handler(HttpMethod::GET,
"/api/compaction/run_status",
run_status_compaction_action);
+
+ DeleteBitmapAction* count_delete_bitmap_action =
+ _pool.add(new
DeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine,
+ TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
+ _ev_http_server->register_handler(HttpMethod::GET,
"/api/delete_bitmap/count_local",
+ count_delete_bitmap_action);
+
CheckTabletSegmentAction* check_tablet_segment_action = _pool.add(new
CheckTabletSegmentAction(
_env, engine, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::POST,
"/api/check_tablet_segment_lost",
@@ -432,14 +439,14 @@ void
HttpService::register_cloud_handler(CloudStorageEngine& engine) {
TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET,
"/api/compaction/run_status",
run_status_compaction_action);
- CloudDeleteBitmapAction* count_local_delete_bitmap_action =
- _pool.add(new
CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine,
- TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
+ DeleteBitmapAction* count_local_delete_bitmap_action =
+ _pool.add(new
DeleteBitmapAction(DeleteBitmapActionType::COUNT_LOCAL, _env, engine,
+ TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET,
"/api/delete_bitmap/count_local",
count_local_delete_bitmap_action);
- CloudDeleteBitmapAction* count_ms_delete_bitmap_action =
- _pool.add(new
CloudDeleteBitmapAction(DeleteBitmapActionType::COUNT_MS, _env, engine,
- TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
+ DeleteBitmapAction* count_ms_delete_bitmap_action =
+ _pool.add(new DeleteBitmapAction(DeleteBitmapActionType::COUNT_MS,
_env, engine,
+ TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
_ev_http_server->register_handler(HttpMethod::GET,
"/api/delete_bitmap/count_ms",
count_ms_delete_bitmap_action);
#ifdef ENABLE_INJECTION_POINT
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 18b3571299f..0970ded03e9 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -372,6 +372,8 @@ Status NewOlapScanner::_init_tablet_reader_params(
_tablet_reader_params.delete_bitmap =
&tablet->tablet_meta()->delete_bitmap();
}
+ DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block",
DBUG_BLOCK);
+
if (!_state->skip_storage_engine_merge()) {
TOlapScanNode& olap_scan_node =
((pipeline::OlapScanLocalState*)_local_state)->olap_scan_node();
diff --git
a/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out
b/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out
new file mode 100644
index 00000000000..9896795dfd4
Binary files /dev/null and
b/regression-test/data/compaction/test_mow_delete_unused_rowset_dm_docker.out
differ
diff --git
a/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz
b/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz
new file mode 100644
index 00000000000..bc9d3dd70ea
Binary files /dev/null and
b/regression-test/data/compaction/test_schema_change_add_key_column.csv.gz
differ
diff --git
a/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz
b/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz
new file mode 100644
index 00000000000..83f1ecd41c0
Binary files /dev/null and
b/regression-test/data/compaction/test_schema_change_add_key_column1.csv.gz
differ
diff --git
a/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy
b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy
new file mode 100644
index 00000000000..f286a2becae
--- /dev/null
+++ b/regression-test/suites/compaction/test_mow_compact_multi_segments.groovy
@@ -0,0 +1,253 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// when compaction for one rowsets with multiple segments, the delete bitmap
can be deleted
+suite("test_mow_compact_multi_segments", "nonConcurrent") {
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ def tableName = "test_mow_compact_multi_segments"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string: [:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
+ def getTabletStatus = { tablet, rowsetIndex, lastRowsetSegmentNum,
enableAssert = false ->
+ String compactionUrl = tablet["CompactionStatus"]
+ def (code, out, err) = curl("GET", compactionUrl)
+ logger.info("Show tablets status: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ assertTrue(tabletJson.rowsets.size() >= rowsetIndex)
+ def rowset = tabletJson.rowsets.get(rowsetIndex - 1)
+ logger.info("rowset: ${rowset}")
+ int start_index = rowset.indexOf("]")
+ int end_index = rowset.indexOf("DATA")
+ def segmentNumStr = rowset.substring(start_index + 1, end_index).trim()
+ logger.info("segmentNumStr: ${segmentNumStr}")
+ if (enableAssert) {
+ assertEquals(lastRowsetSegmentNum, Integer.parseInt(segmentNumStr))
+ } else {
+ return lastRowsetSegmentNum == Integer.parseInt(segmentNumStr);
+ }
+ }
+
+ def getLocalDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get local delete bitmap count status: =" + code + ",
out=" + out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ def waitForCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def running = true
+ do {
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ // batch_size is 4164 in csv_reader.cpp
+ // _batch_size is 8192 in vtablet_writer.cpp
+ onFinish {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ reset_be_param("doris_scanner_row_bytes")
+ reset_be_param("tablet_rowset_stale_sweep_time_sec")
+ }
+ GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset")
+ get_be_param("doris_scanner_row_bytes")
+ set_be_param("doris_scanner_row_bytes", "1")
+ get_be_param("tablet_rowset_stale_sweep_time_sec")
+ set_be_param("tablet_rowset_stale_sweep_time_sec", "0")
+
+ tableName = "test_compact_multi_segments_"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(11) NULL,
+ `k2` int(11) NULL,
+ `v3` int(11) NULL,
+ `v4` int(11) NULL
+ ) unique KEY(`k1`, `k2`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+ String tablet_id = tablet.TabletId
+ def backend_id = tablet.BackendId
+
+ // load 1
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+ file 'test_schema_change_add_key_column.csv.gz'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(8192, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ def rowCount1 = sql """ select count() from ${tableName}; """
+ logger.info("rowCount1: ${rowCount1}")
+ // check generate 3 segments
+ getTabletStatus(tablet, 2, 3)
+
+ // trigger compaction
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id: "${tablet.TabletId}", start_version: 2, end_version:
2])
+ def (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ logger.info("compact json: " + compactJson)
+ // check generate 1 segments
+ for (int i = 0; i < 20; i++) {
+ if (getTabletStatus(tablet, 2, 1, false)) {
+ break
+ }
+ sleep(100)
+ }
+ getTabletStatus(tablet, 2, 1)
+ sql """ select * from ${tableName} limit 1; """
+
+ // load 2
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'compress_type', 'GZ'
+ file 'test_schema_change_add_key_column1.csv.gz'
+ time 10000 // limit inflight 10s
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(20480, json.NumberTotalRows)
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ sql "sync"
+ def rowCount2 = sql """ select count() from ${tableName}; """
+ logger.info("rowCount2: ${rowCount2}")
+ // check generate 3 segments
+ getTabletStatus(tablet, 3, 6)
+ def local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local delete bitmap 1: " + local_dm)
+
+ // trigger compaction for load 2
+
GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets",
+ [tablet_id: "${tablet.TabletId}", start_version: 3, end_version:
3])
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ compactJson = parseJson(out.trim())
+ logger.info("compact json: " + compactJson)
+ waitForCompaction(tablet)
+ // check generate 1 segments
+ for (int i = 0; i < 20; i++) {
+ if (getTabletStatus(tablet, 3, 1, false)) {
+ break
+ }
+ sleep(100)
+ }
+ getTabletStatus(tablet, 3, 1)
+
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets")
// cloud
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset")
// local
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local delete bitmap 2: " + local_dm)
+ assertEquals(1, local_dm["delete_bitmap_count"])
+}
diff --git
a/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy
b/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy
new file mode 100644
index 00000000000..53bc4f10e51
--- /dev/null
+++
b/regression-test/suites/compaction/test_mow_delete_unused_rowset_dm_docker.groovy
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.doris.regression.suite.ClusterOptions
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_mow_delete_unused_rowset_dm_docker", "docker") {
+ logger.info("test_mow_delete_unused_rowset_dm_docker")
+ def options = new ClusterOptions()
+ options.cloudMode = false
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.enableDebugPoints()
+ options.feConfigs.add("enable_workload_group=false")
+ // beConfigs
+ options.beConfigs.add('compaction_promotion_version_count=5')
+ options.beConfigs.add('tablet_rowset_stale_sweep_time_sec=0')
+ options.beConfigs.add('enable_mow_verbose_log=true')
+ options.beConfigs.add('enable_java_support=false')
+
+ def testTable = "test_mow_delete_unused_rowset_dm_docker"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+
+ def triggerCompaction = { tablet ->
+ def compact_type = "cumulative"
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ if (compact_type == "cumulative") {
+ def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 +
", err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ } else if (compact_type == "full") {
+ def (code_2, out_2, err_2) = be_run_full_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 +
", err=" + err_2)
+ assertEquals(code_2, 0)
+ return out_2
+ } else {
+ assertFalse(True)
+ }
+ }
+
+ def getTabletStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def waitForCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def running = true
+ do {
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ def getLocalDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ boolean running = true
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get local delete bitmap count status: =" + code + ",
out=" + out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ docker(options) {
+ sql """ DROP TABLE IF EXISTS ${testTable} """
+ sql """
+ create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL)
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ getBackendIpHttpPort(backendId_to_backendIP,
backendId_to_backendHttpPort);
+
+ def tablets = sql_return_maparray """ show tablets from ${testTable};
"""
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset")
+
+ // 1. write some data
+ sql """ INSERT INTO ${testTable} VALUES (1,98); """
+ sql """ INSERT INTO ${testTable} VALUES (1,99),(2,99); """
+ sql """ INSERT INTO ${testTable} VALUES (3,99); """
+ sql """ INSERT INTO ${testTable} VALUES (4,99); """
+ sql """ INSERT INTO ${testTable} VALUES (5,99); """
+ sql "sync"
+ order_qt_sql1 """ select * from ${testTable}; """
+
+ // 2. trigger compaction to generate base rowset
+ getTabletStatus(tablet)
+ assertTrue(triggerCompaction(tablet).contains("Success"))
+ waitForCompaction(tablet)
+ def tablet_status = getTabletStatus(tablet)
+ assertEquals(2, tablet_status["rowsets"].size())
+
+ // 3. wait for no delete bitmap and no stale rowsets
+ def local_dm = getLocalDeleteBitmapStatus(tablet)
+ assertEquals(0, local_dm["delete_bitmap_count"])
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(0, tablet_status["stale_rowsets"].size())
+
+ // 3. write some data
+ sql """ INSERT INTO ${testTable} VALUES (1,100); """
+ sql """ INSERT INTO ${testTable} VALUES (1,101),(2,100); """
+ sql """ INSERT INTO ${testTable} VALUES (3,100); """
+ sql """ INSERT INTO ${testTable} VALUES (4,100); """
+ sql """ INSERT INTO ${testTable} VALUES (5,100); """
+ sql """ sync """
+ order_qt_sql2 "select * from ${testTable}"
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(7, tablet_status["rowsets"].size())
+
+ // 4. trigger compaction
+
GetDebugPoint().enableDebugPointForAllBEs("StorageEngine::start_delete_unused_rowset.block")
+ assertTrue(triggerCompaction(tablet).contains("Success"))
+ waitForCompaction(tablet)
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(3, tablet_status["rowsets"].size())
+
+ // 5. block delete unused rowset, there are delete bitmaps; wait for
no stale rowsets
+
GetDebugPoint().disableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset")
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local_dm 1: " + local_dm)
+ assertEquals(6, local_dm["delete_bitmap_count"])
+ tablet_status = getTabletStatus(tablet)
+ assertEquals(0, tablet_status["stale_rowsets"].size())
+
+ // 6. restart be. check delete bitmap count
+ cluster.restartBackends()
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status after restart: " + tablet_status)
+ for (int i = 0; i < 300; i++) {
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ if (local_dm["delete_bitmap_count"] == 5) {
+ break
+ }
+ sleep(20)
+ }
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local_dm 2: " + local_dm)
+ assertEquals(5, local_dm["delete_bitmap_count"])
+ order_qt_sql3 """ select * from ${testTable}; """
+
+ // 7. restart be to check to the deleted delete bitmap is stored to
local storage
+ cluster.restartBackends()
+ tablet_status = getTabletStatus(tablet)
+ logger.info("tablet status after restart2: " + tablet_status)
+ for (int i = 0; i < 300; i++) {
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ if (local_dm["delete_bitmap_count"] == 5) {
+ break
+ }
+ sleep(20)
+ }
+ local_dm = getLocalDeleteBitmapStatus(tablet)
+ logger.info("local_dm 3: " + local_dm)
+ assertEquals(5, local_dm["delete_bitmap_count"])
+ }
+}
diff --git
a/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy
b/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy
new file mode 100644
index 00000000000..b91a19784e6
--- /dev/null
+++
b/regression-test/suites/compaction/test_mow_stale_rowset_delete_bitmap.groovy
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+// when move rowsets from stale to unused, the delete bitmap are not deleted
+// when delete unused rowsets, the delete bitmap are deleted
+suite("test_mow_stale_rowset_delete_bitmap", "nonConcurrent") {
+ def testTable = "test_mow_stale_rowset_delete_bitmap"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ def backendId_to_params = [string: [:]]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def set_be_param = { paramName, paramValue ->
+ // for eache be node, set paramName=paramValue
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
paramValue))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def reset_be_param = { paramName ->
+ // for eache be node, reset paramName to default
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ def original_value = backendId_to_params.get(id).get(paramName)
+ def (code, out, err) = curl("POST",
String.format("http://%s:%s/api/update_config?%s=%s", beIp, bePort, paramName,
original_value))
+ assertTrue(out.contains("OK"))
+ }
+ }
+
+ def get_be_param = { paramName ->
+ // for eache be node, get param value by default
+ def paramValue = ""
+ for (String id in backendId_to_backendIP.keySet()) {
+ def beIp = backendId_to_backendIP.get(id)
+ def bePort = backendId_to_backendHttpPort.get(id)
+ // get the config value from be
+ def (code, out, err) = curl("GET",
String.format("http://%s:%s/api/show_config?conf_item=%s", beIp, bePort,
paramName))
+ assertTrue(code == 0)
+ assertTrue(out.contains(paramName))
+ // parsing
+ def resultList = parseJson(out)[0]
+ assertTrue(resultList.size() == 4)
+ // get original value
+ paramValue = resultList[2]
+ backendId_to_params.get(id, [:]).put(paramName, paramValue)
+ }
+ }
+
+ def triggerCompaction = { tablet ->
+ def compact_type = "cumulative"
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ if (compact_type == "cumulative") {
+ def (code_1, out_1, err_1) = be_run_cumulative_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_1 + ", out=" + out_1 +
", err=" + err_1)
+ assertEquals(code_1, 0)
+ return out_1
+ } else if (compact_type == "full") {
+ def (code_2, out_2, err_2) = be_run_full_compaction(be_host,
be_http_port, tablet_id)
+ logger.info("Run compaction: code=" + code_2 + ", out=" + out_2 +
", err=" + err_2)
+ assertEquals(code_2, 0)
+ return out_2
+ } else {
+ assertFalse(True)
+ }
+ }
+
+ def getTabletStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/show?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get tablet status: =" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def tabletStatus = parseJson(out.trim())
+ return tabletStatus
+ }
+
+ def waitForCompaction = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ def running = true
+ do {
+ Thread.sleep(1000)
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/compaction/run_status?tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get compaction status: code=" + code + ", out=" + out)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ def getLocalDeleteBitmapStatus = { tablet ->
+ String tablet_id = tablet.TabletId
+ String trigger_backend_id = tablet.BackendId
+ def be_host = backendId_to_backendIP[trigger_backend_id]
+ def be_http_port = backendId_to_backendHttpPort[trigger_backend_id]
+ boolean running = true
+ StringBuilder sb = new StringBuilder();
+ sb.append("curl -X GET http://${be_host}:${be_http_port}")
+ sb.append("/api/delete_bitmap/count_local?verbose=true&tablet_id=")
+ sb.append(tablet_id)
+
+ String command = sb.toString()
+ logger.info(command)
+ def process = command.execute()
+ def code = process.waitFor()
+ def out = process.getText()
+ logger.info("Get local delete bitmap count status: =" + code + ",
out=" + out)
+ assertEquals(code, 0)
+ def deleteBitmapStatus = parseJson(out.trim())
+ return deleteBitmapStatus
+ }
+
+ AtomicBoolean query_result = new AtomicBoolean(true)
+ def query = {
+ logger.info("query start")
+ def results = sql_return_maparray """ select * from ${testTable}; """
+ logger.info("query result: " + results)
+ Set<String> keys = new HashSet<>()
+ for (final def result in results) {
+ if (keys.contains(result.k)) {
+ logger.info("find duplicate key: " + result.k)
+ query_result.set(false)
+ break
+ }
+ keys.add(result.k)
+ }
+ logger.info("query finish. query_result: " + query_result.get())
+ }
+
+ sql """ DROP TABLE IF EXISTS ${testTable} """
+ sql """
+ create table ${testTable} (`k` int NOT NULL, `v` int NOT NULL)
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH(`k`) BUCKETS 1
+ PROPERTIES (
+ "enable_unique_key_merge_on_write" = "true",
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+
+ def tablets = sql_return_maparray """ show tablets from ${testTable}; """
+ logger.info("tablets: " + tablets)
+ assertEquals(1, tablets.size())
+ def tablet = tablets[0]
+
+ try {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ get_be_param("tablet_rowset_stale_sweep_time_sec")
+ set_be_param("tablet_rowset_stale_sweep_time_sec", "0")
+
+ // write some data
+ sql """ INSERT INTO ${testTable} VALUES (1,99); """
+ sql """ INSERT INTO ${testTable} VALUES (2,99); """
+ sql """ INSERT INTO ${testTable} VALUES (3,99); """
+ sql """ INSERT INTO ${testTable} VALUES (4,99),(5,0); """
+ sql """ INSERT INTO ${testTable} VALUES (5,99); """
+ sql "sync"
+ getTabletStatus(tablet)
+ getLocalDeleteBitmapStatus(tablet)
+
+ // trigger and block one query
+
GetDebugPoint().enableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block")
+ Thread query_thread = new Thread(() -> query())
+ query_thread.start()
+ sleep(100)
+
+ // trigger compaction
+
GetDebugPoint().enableDebugPointForAllBEs("CumulativeCompaction.modify_rowsets.delete_expired_stale_rowset")
+
GetDebugPoint().enableDebugPointForAllBEs("Tablet.delete_expired_stale_rowset.start_delete_unused_rowset")
+ assertTrue(triggerCompaction(tablet).contains("Success"))
+ waitForCompaction(tablet)
+ // wait for stale rowsets are deleted
+ for (int i = 0; i < 10; i++) {
+ def tablet_status = getTabletStatus(tablet)
+ if (tablet_status["rowsets"].size() <= 2 &&
tablet_status["stale_rowsets"].size() == 0) {
+ break
+ }
+ sleep(200)
+ }
+ getLocalDeleteBitmapStatus(tablet)
+
+ // unblock the query
+
GetDebugPoint().disableDebugPointForAllBEs("NewOlapScanner::_init_tablet_reader_params.block")
+ query_thread.join()
+ assertTrue(query_result.get(), "found duplicated keys")
+
+ // wait for delete bitmap of unused rowsets are deleted
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.vacuum_stale_rowsets")
// cloud
+
GetDebugPoint().enableDebugPointForAllBEs("DeleteBitmapAction._handle_show_local_delete_bitmap_count.start_delete_unused_rowset")
// local
+ for (int i = 0; i < 20; i++) {
+ def local_delete_bitmap_status = getLocalDeleteBitmapStatus(tablet)
+ if (local_delete_bitmap_status["delete_bitmap_count"] == 0) {
+ break
+ }
+ sleep(100)
+ }
+ } finally {
+ reset_be_param("tablet_rowset_stale_sweep_time_sec")
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]