This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9d8c32d5af4 [Improve](Variant) only `merge_schema` when `sync_tablets`
or scan in… (#48570)
9d8c32d5af4 is described below
commit 9d8c32d5af451bfcf6c4bef443fcf5e8bb62e80f
Author: lihangyu <[email protected]>
AuthorDate: Thu Mar 27 15:56:16 2025 +0800
[Improve](Variant) only `merge_schema` when `sync_tablets` or scan in…
(#48570)
… cloud mode
1. refactor some options
2. set merge_schema only when `sync_tablets` or scan
This will reduce cost of `merge_schema` typically in MOW model with
variant type with large number of subcolumns
---
be/src/cloud/cloud_backend_service.cpp | 4 +++-
be/src/cloud/cloud_meta_mgr.cpp | 17 ++++++++++-------
be/src/cloud/cloud_meta_mgr.h | 4 ++--
be/src/cloud/cloud_schema_change_job.cpp | 4 +++-
be/src/cloud/cloud_tablet.cpp | 12 ++++++------
be/src/cloud/cloud_tablet.h | 10 +++++++++-
be/src/cloud/cloud_tablet_mgr.cpp | 12 ++++++++----
be/src/http/action/calc_file_crc_action.cpp | 2 +-
be/src/http/action/delete_bitmap_action.cpp | 6 +++++-
be/src/pipeline/exec/olap_scan_operator.cpp | 5 ++++-
be/src/service/point_query_executor.cpp | 4 +++-
11 files changed, 54 insertions(+), 26 deletions(-)
diff --git a/be/src/cloud/cloud_backend_service.cpp
b/be/src/cloud/cloud_backend_service.cpp
index 265e6c44aac..63f76632d79 100644
--- a/be/src/cloud/cloud_backend_service.cpp
+++ b/be/src/cloud/cloud_backend_service.cpp
@@ -68,7 +68,9 @@ void
CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&,
if (!result.has_value()) {
return;
}
- Status st = result.value()->sync_rowsets(-1, true);
+ SyncOptions options;
+ options.warmup_delta_data = true;
+ Status st = result.value()->sync_rowsets(options);
if (!st.ok()) {
LOG_WARNING("failed to sync load for tablet").error(st);
}
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index d6ac44e5d08..2ab18b9ad9d 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -481,8 +481,7 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id,
TabletMetaSharedPtr* tab
return Status::OK();
}
-Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool
warmup_delta_data,
- bool sync_delete_bitmap, bool
full_sync) {
+Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, const
SyncOptions& options) {
using namespace std::chrono;
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudMetaMgr::sync_tablet_rowsets",
Status::OK(), tablet);
@@ -509,7 +508,7 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
idx->set_partition_id(tablet->partition_id());
{
std::shared_lock rlock(tablet->get_header_lock());
- if (full_sync) {
+ if (options.full_sync) {
req.set_start_version(0);
} else {
req.set_start_version(tablet->max_version_unlocked() + 1);
@@ -569,12 +568,13 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
// If is mow, the tablet has no delete bitmap in base rowsets.
// So dont need to sync it.
- if (sync_delete_bitmap && tablet->enable_unique_key_merge_on_write() &&
+ if (options.sync_delete_bitmap &&
tablet->enable_unique_key_merge_on_write() &&
tablet->tablet_state() == TABLET_RUNNING) {
DeleteBitmap delete_bitmap(tablet_id);
int64_t old_max_version = req.start_version() - 1;
auto st = sync_tablet_delete_bitmap(tablet, old_max_version,
resp.rowset_meta(),
- resp.stats(), req.idx(),
&delete_bitmap, full_sync);
+ resp.stats(), req.idx(),
&delete_bitmap,
+ options.full_sync);
if (st.is<ErrorCode::ROWSETS_EXPIRED>() && tried++ < retry_times) {
LOG_WARNING("rowset meta is expired, need to retry")
.tag("tablet", tablet->tablet_id())
@@ -679,8 +679,11 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet*
tablet, bool warmup_delta_
// after doing EMPTY_CUMULATIVE compaction, MS cp is 13,
get_rowset will return [2-11][12-12].
bool version_overlap =
tablet->max_version_unlocked() >=
rowsets.front()->start_version();
- tablet->add_rowsets(std::move(rowsets), version_overlap,
wlock, warmup_delta_data);
- RETURN_IF_ERROR(tablet->merge_rowsets_schema());
+ tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
+ options.warmup_delta_data);
+ if (options.merge_schema) {
+ RETURN_IF_ERROR(tablet->merge_rowsets_schema());
+ }
}
tablet->last_base_compaction_success_time_ms =
stats.last_base_compaction_time_ms();
tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index da82e025da2..1dd09de3705 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -24,6 +24,7 @@
#include <variant>
#include <vector>
+#include "cloud/cloud_tablet.h"
#include "common/status.h"
#include "olap/rowset/rowset_meta.h"
#include "util/s3_util.h"
@@ -62,8 +63,7 @@ public:
Status get_schema_dict(int64_t index_id,
std::shared_ptr<SchemaCloudDictionary>* schema_dict);
- Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data =
false,
- bool sync_delete_bitmap = true, bool full_sync
= false);
+ Status sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options
= {});
Status prepare_rowset(const RowsetMeta& rs_meta,
std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
diff --git a/be/src/cloud/cloud_schema_change_job.cpp
b/be/src/cloud/cloud_schema_change_job.cpp
index 5571ee166ba..514abed08d4 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -90,7 +90,9 @@ Status CloudSchemaChangeJob::process_alter_tablet(const
TAlterTabletReqV2& reque
request.base_tablet_id);
}
// MUST sync rowsets before capturing rowset readers and building
DeleteHandler
- RETURN_IF_ERROR(_base_tablet->sync_rowsets(request.alter_version));
+ SyncOptions options;
+ options.query_version = request.alter_version;
+ RETURN_IF_ERROR(_base_tablet->sync_rowsets(options));
// ATTN: Only convert rowsets of version larger than 1, MUST let the new
tablet cache have rowset [0-1]
_output_cumulative_point = _base_tablet->cumulative_layer_point();
std::vector<RowSetSplits> rs_splits;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index f3203afd76d..692bf0a84c5 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -144,26 +144,26 @@ Status CloudTablet::merge_rowsets_schema() {
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't
find this tablet in MS.
-Status CloudTablet::sync_rowsets(int64_t query_version, bool
warmup_delta_data) {
+Status CloudTablet::sync_rowsets(const SyncOptions& options) {
RETURN_IF_ERROR(sync_if_not_running());
- if (query_version > 0) {
+ if (options.query_version > 0) {
std::shared_lock rlock(_meta_lock);
- if (_max_version >= query_version) {
+ if (_max_version >= options.query_version) {
return Status::OK();
}
}
// serially execute sync to reduce unnecessary network overhead
std::lock_guard lock(_sync_meta_lock);
- if (query_version > 0) {
+ if (options.query_version > 0) {
std::shared_lock rlock(_meta_lock);
- if (_max_version >= query_version) {
+ if (_max_version >= options.query_version) {
return Status::OK();
}
}
- auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
+ auto st = _engine.meta_mgr().sync_tablet_rowsets(this, options);
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 84664bb37da..03ba47b27a9 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -26,6 +26,14 @@ namespace doris {
class CloudStorageEngine;
+struct SyncOptions {
+ bool warmup_delta_data = false;
+ bool sync_delete_bitmap = true;
+ bool full_sync = false;
+ bool merge_schema = false;
+ int64_t query_version = -1;
+};
+
class CloudTablet final : public BaseTablet {
public:
CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta);
@@ -68,7 +76,7 @@ public:
// If `query_version` > 0 and local max_version of the tablet >=
`query_version`, do nothing.
// If 'need_download_data_async' is true, it means that we need to
download the new version
// rowsets datum async.
- Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data =
false);
+ Status sync_rowsets(const SyncOptions& options = {});
// Synchronize the tablet meta from meta service.
Status sync_meta();
diff --git a/be/src/cloud/cloud_tablet_mgr.cpp
b/be/src/cloud/cloud_tablet_mgr.cpp
index 04a1c33d5c3..9744626af6f 100644
--- a/be/src/cloud/cloud_tablet_mgr.cpp
+++ b/be/src/cloud/cloud_tablet_mgr.cpp
@@ -180,8 +180,10 @@ Result<std::shared_ptr<CloudTablet>>
CloudTabletMgr::get_tablet(int64_t tablet_i
auto tablet = std::make_shared<CloudTablet>(_engine,
std::move(tablet_meta));
auto value = std::make_unique<Value>(tablet, *_tablet_map);
// MUST sync stats to let compaction scheduler work correctly
- st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(),
warmup_data,
- sync_delete_bitmap);
+ SyncOptions options;
+ options.warmup_delta_data = warmup_data;
+ options.sync_delete_bitmap = sync_delete_bitmap;
+ st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), options);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet " << tablet_id << ": "
<< st;
return nullptr;
@@ -289,8 +291,10 @@ void CloudTabletMgr::sync_tablets(const CountDownLatch&
stop_latch) {
continue;
}
}
-
- st = tablet->sync_rowsets(-1);
+ SyncOptions options;
+ options.query_version = -1;
+ options.merge_schema = true;
+ st = tablet->sync_rowsets(options);
if (!st) {
LOG_WARNING("failed to sync tablet rowsets {}",
tablet->tablet_id()).error(st);
}
diff --git a/be/src/http/action/calc_file_crc_action.cpp
b/be/src/http/action/calc_file_crc_action.cpp
index 123f55dd7fd..64433a4aa54 100644
--- a/be/src/http/action/calc_file_crc_action.cpp
+++ b/be/src/http/action/calc_file_crc_action.cpp
@@ -64,7 +64,7 @@ Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req,
uint32_t* crc_value
if (auto cloudEngine = dynamic_cast<CloudStorageEngine*>(&_engine)) {
tablet = DORIS_TRY(cloudEngine->get_tablet(tablet_id));
// sync all rowsets
-
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets(-1));
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(tablet)->sync_rowsets());
} else if (auto storageEngine = dynamic_cast<StorageEngine*>(&_engine)) {
auto tabletPtr =
storageEngine->tablet_manager()->get_tablet(tablet_id);
tablet = std::dynamic_pointer_cast<Tablet>(tabletPtr);
diff --git a/be/src/http/action/delete_bitmap_action.cpp
b/be/src/http/action/delete_bitmap_action.cpp
index 2fa0a73c2f3..b5738475710 100644
--- a/be/src/http/action/delete_bitmap_action.cpp
+++ b/be/src/http/action/delete_bitmap_action.cpp
@@ -135,7 +135,11 @@ Status
DeleteBitmapAction::_handle_show_ms_delete_bitmap_count(HttpRequest* req,
return st;
}
auto tablet = std::make_shared<CloudTablet>(_engine.to_cloud(),
std::move(tablet_meta));
- st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(),
false, true, true);
+ SyncOptions options;
+ options.warmup_delta_data = false;
+ options.sync_delete_bitmap = true;
+ options.full_sync = true;
+ st = _engine.to_cloud().meta_mgr().sync_tablet_rowsets(tablet.get(),
options);
if (!st.ok()) {
LOG(WARNING) << "failed to sync tablet=" << tablet_id << ", st=" << st;
return st;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index a81cd2df436..b93c22274d9 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -445,8 +445,11 @@ Status OlapScanLocalState::hold_tablets() {
tasks.reserve(_scan_ranges.size());
for (auto&& [cur_tablet, cur_version] : _tablets) {
tasks.emplace_back([cur_tablet, cur_version]() {
+ SyncOptions options;
+ options.query_version = cur_version;
+ options.merge_schema = true;
return std::dynamic_pointer_cast<CloudTablet>(cur_tablet)
- ->sync_rowsets(cur_version);
+ ->sync_rowsets(options);
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(tasks, 10));
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index dc50ef9937f..12975fbf9fe 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -404,7 +404,9 @@ Status PointQueryExecutor::_lookup_row_key() {
Status st;
if (_version >= 0) {
CHECK(config::is_cloud_mode()) << "Only cloud mode support snapshot
read at present";
-
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(_version));
+ SyncOptions options;
+ options.query_version = _version;
+
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablet)->sync_rowsets(options));
}
std::vector<RowsetSharedPtr> specified_rowsets;
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]