This is an automated email from the ASF dual-hosted git repository.
eldenmoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ce917565fe9 [refactor](variant) remove some useless code (#54663)
ce917565fe9 is described below
commit ce917565fe9503d3601685f8c10edb161b4298cd
Author: lihangyu <[email protected]>
AuthorDate: Wed Aug 13 21:13:30 2025 +0800
[refactor](variant) remove some useless code (#54663)
---
be/src/cloud/cloud_meta_mgr.cpp | 127 +----------
be/src/cloud/cloud_meta_mgr.h | 2 -
be/src/cloud/cloud_rowset_writer.cpp | 14 +-
be/src/cloud/cloud_storage_engine.cpp | 3 -
be/src/cloud/cloud_storage_engine.h | 6 +-
be/src/cloud/cloud_tablet.cpp | 35 ---
be/src/cloud/cloud_tablet.h | 9 -
be/src/cloud/schema_cloud_dictionary_cache.cpp | 252 ---------------------
be/src/cloud/schema_cloud_dictionary_cache.h | 101 ---------
be/src/olap/base_tablet.cpp | 27 +--
be/src/olap/base_tablet.h | 8 -
be/src/olap/rowset/beta_rowset_writer.cpp | 50 +---
be/src/olap/rowset/beta_rowset_writer.h | 7 +-
be/src/olap/rowset/beta_rowset_writer_v2.cpp | 5 +-
be/src/olap/rowset/beta_rowset_writer_v2.h | 3 +-
be/src/olap/rowset/rowset_writer.h | 3 +-
be/src/olap/rowset/rowset_writer_context.h | 2 -
be/src/olap/rowset/segment_creator.cpp | 13 +-
be/src/olap/rowset/segment_creator.h | 10 +-
be/src/olap/rowset/segment_v2/segment_writer.h | 4 -
.../rowset/segment_v2/vertical_segment_writer.h | 5 -
be/src/olap/rowset_builder.cpp | 19 --
be/src/runtime/load_stream.cpp | 9 +-
be/src/runtime/load_stream_writer.cpp | 5 +-
be/src/runtime/load_stream_writer.h | 2 +-
be/src/runtime/memory/cache_policy.h | 3 -
be/src/service/internal_service.cpp | 9 +-
be/src/vec/sink/load_stream_stub.cpp | 6 +-
be/src/vec/sink/load_stream_stub.h | 3 +-
.../cloud/test_schema_cloud_dictionary_cache.cpp | 232 -------------------
cloud/src/meta-service/meta_service_schema.cpp | 1 +
31 files changed, 47 insertions(+), 928 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index fc1fc5bfd16..9c78a7529f1 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -42,7 +42,6 @@
#include "cloud/cloud_warm_up_manager.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
-#include "cloud/schema_cloud_dictionary_cache.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
@@ -445,49 +444,6 @@ Status retry_rpc(std::string_view op_name, const Request&
req, Response* res,
return Status::RpcError("failed to {}: rpc timeout, last msg={}", op_name,
error_msg);
}
-Status fill_schema_with_dict(const RowsetMetaCloudPB& in, RowsetMetaPB* out,
- const SchemaCloudDictionary& dict) {
- std::unordered_map<int32_t, ColumnPB*> unique_id_map;
- //init map
- for (ColumnPB& column : *out->mutable_tablet_schema()->mutable_column()) {
- unique_id_map[column.unique_id()] = &column;
- }
- // column info
- for (int i = 0; i < in.schema_dict_key_list().column_dict_key_list_size();
++i) {
- int dict_key = in.schema_dict_key_list().column_dict_key_list(i);
- if (dict.column_dict().find(dict_key) == dict.column_dict().end()) {
- return Status::NotFound("Not found entry {}", dict_key);
- }
- const ColumnPB& dict_val = dict.column_dict().at(dict_key);
- ColumnPB& to_add = *out->mutable_tablet_schema()->add_column();
- to_add = dict_val;
- VLOG_DEBUG << "fill dict column " << dict_val.ShortDebugString();
- }
-
- // index info
- for (int i = 0; i <
in.schema_dict_key_list().index_info_dict_key_list_size(); ++i) {
- int dict_key = in.schema_dict_key_list().index_info_dict_key_list(i);
- if (dict.index_dict().find(dict_key) == dict.index_dict().end()) {
- return Status::NotFound("Not found entry {}", dict_key);
- }
- const doris::TabletIndexPB& dict_val = dict.index_dict().at(dict_key);
- *out->mutable_tablet_schema()->add_index() = dict_val;
- VLOG_DEBUG << "fill dict index " << dict_val.ShortDebugString();
- }
-
- // sparse column info
- for (int i = 0; i <
in.schema_dict_key_list().sparse_column_dict_key_list_size(); ++i) {
- int dict_key =
in.schema_dict_key_list().sparse_column_dict_key_list(i);
- if (dict.column_dict().find(dict_key) == dict.column_dict().end()) {
- return Status::NotFound("Not found entry {}", dict_key);
- }
- const ColumnPB& dict_val = dict.column_dict().at(dict_key);
- *unique_id_map.at(dict_val.parent_unique_id())->add_sparse_columns() =
dict_val;
- VLOG_DEBUG << "fill dict sparse column" << dict_val.ShortDebugString();
- }
- return Status::OK();
-}
-
} // namespace
Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr*
tablet_meta) {
@@ -562,10 +518,6 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
- // backend side use schema dict in cache if enable cloud schema dict
cache
- req.set_schema_op(config::variant_use_cloud_schema_dict_cache
- ? GetRowsetRequest::NO_DICT
- : GetRowsetRequest::RETURN_DICT);
VLOG_DEBUG << "send GetRowsetRequest: " << req.ShortDebugString();
auto start = std::chrono::steady_clock::now();
stub->get_rowset(&cntl, &req, &resp, nullptr);
@@ -766,30 +718,7 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
existed_rowset->rowset_id().to_string() ==
cloud_rs_meta_pb.rowset_id_v2()) {
continue; // Same rowset, skip it
}
- RowsetMetaPB meta_pb;
- // Check if the rowset meta contains a schema dictionary key
list.
- if (cloud_rs_meta_pb.has_schema_dict_key_list() &&
!resp.has_schema_dict()) {
- // Use the locally cached dictionary.
- RowsetMetaCloudPB copied_cloud_rs_meta_pb =
cloud_rs_meta_pb;
- CloudStorageEngine& engine =
-
ExecEnv::GetInstance()->storage_engine().to_cloud();
- {
- wlock.unlock();
- RETURN_IF_ERROR(
- engine.get_schema_cloud_dictionary_cache()
-
.replace_dict_keys_to_schema(cloud_rs_meta_pb.index_id(),
-
&copied_cloud_rs_meta_pb));
- wlock.lock();
- }
- meta_pb =
cloud_rowset_meta_to_doris(copied_cloud_rs_meta_pb);
- } else {
- // Otherwise, use the schema dictionary from the response
(if available).
- meta_pb = cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
- if (resp.has_schema_dict()) {
-
RETURN_IF_ERROR(fill_schema_with_dict(cloud_rs_meta_pb, &meta_pb,
-
resp.schema_dict()));
- }
- }
+ RowsetMetaPB meta_pb =
cloud_rowset_meta_to_doris(cloud_rs_meta_pb);
auto rs_meta = std::make_shared<RowsetMeta>();
rs_meta->init_from_pb(meta_pb);
RowsetSharedPtr rowset;
@@ -809,9 +738,6 @@ Status
CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
tablet->max_version_unlocked() >=
rowsets.front()->start_version();
tablet->add_rowsets(std::move(rowsets), version_overlap, wlock,
options.warmup_delta_data);
- if (options.merge_schema) {
- RETURN_IF_ERROR(tablet->merge_rowsets_schema());
- }
}
tablet->last_base_compaction_success_time_ms =
stats.last_base_compaction_time_ms();
tablet->last_cumu_compaction_success_time_ms =
stats.last_cumu_compaction_time_ms();
@@ -1051,21 +977,6 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta,
const std::string& job_i
RowsetMetaPB rs_meta_pb = rs_meta.get_rowset_pb();
doris_rowset_meta_to_cloud(req.mutable_rowset_meta(),
std::move(rs_meta_pb));
- // Replace schema dictionary keys based on the rowset's index ID to
maintain schema consistency.
- CloudStorageEngine& engine =
ExecEnv::GetInstance()->storage_engine().to_cloud();
- // if not enable dict cache, then directly return true to avoid refresh
- Status replaced_st =
- config::variant_use_cloud_schema_dict_cache
- ?
engine.get_schema_cloud_dictionary_cache().replace_schema_to_dict_keys(
- rs_meta_pb.index_id(), req.mutable_rowset_meta())
- : Status::OK();
- // if the replaced_st is not ok and alse not NotFound, then we need to
just return the replaced_st
- VLOG_DEBUG << "replace schema to dict keys, replaced_st: " <<
replaced_st.to_string()
- << ", replaced_st.is<ErrorCode::NOT_FOUND>(): "
- << replaced_st.is<ErrorCode::NOT_FOUND>();
- if (!replaced_st.ok() && !replaced_st.is<ErrorCode::NOT_FOUND>()) {
- return replaced_st;
- }
Status st = retry_rpc("commit rowset", req, &resp,
&MetaService_Stub::commit_rowset);
if (!st.ok() && resp.status().code() == MetaServiceCode::ALREADY_EXISTED) {
if (existed_rs_meta != nullptr && resp.has_existed_rowset_meta()) {
@@ -1076,14 +987,6 @@ Status CloudMetaMgr::commit_rowset(RowsetMeta& rs_meta,
const std::string& job_i
}
return Status::AlreadyExist("failed to commit rowset: {}",
resp.status().msg());
}
- // If dictionary replacement fails, it may indicate that the local schema
dictionary is outdated.
- // Refreshing the dictionary here ensures that the rowset metadata is
updated with the latest schema definitions,
- // which is critical for maintaining consistency between the rowset and
its corresponding schema.
- if (replaced_st.is<ErrorCode::NOT_FOUND>()) {
- RETURN_IF_ERROR(
-
engine.get_schema_cloud_dictionary_cache().refresh_dict(rs_meta_pb.index_id()));
- }
-
int64_t timeout_ms = -1;
// if the `job_id` is not empty, it means this rowset was produced by a
compaction job.
if (config::enable_compaction_delay_commit_for_warm_up && !job_id.empty())
{
@@ -1686,33 +1589,5 @@ int64_t CloudMetaMgr::get_inverted_index_file_szie(const
RowsetMeta& rs_meta) {
return total_inverted_index_size;
}
-Status CloudMetaMgr::get_schema_dict(int64_t index_id,
- std::shared_ptr<SchemaCloudDictionary>*
schema_dict) {
- VLOG_DEBUG << "Sending GetSchemaDictRequest, index_id: " << index_id;
-
- // Create the request and response objects.
- GetSchemaDictRequest req;
- GetSchemaDictResponse resp;
- req.set_cloud_unique_id(config::cloud_unique_id);
- req.set_index_id(index_id);
-
- // Invoke RPC via the retry_rpc helper function.
- // It will call the MetaService_Stub::get_schema_dict method.
- Status st = retry_rpc("get schema dict", req, &resp,
&MetaService_Stub::get_schema_dict);
- if (!st.ok()) {
- return st;
- }
-
- // Optionally, additional checking of the response status can be done here.
- // For example, if the returned status code indicates a parsing or not
found error,
- // you may return an error accordingly.
-
- // Copy the retrieved schema dictionary from the response.
- *schema_dict = std::make_shared<SchemaCloudDictionary>();
- (*schema_dict)->Swap(resp.mutable_schema_dict());
- VLOG_DEBUG << "Successfully obtained schema dict, index_id: " << index_id;
- return Status::OK();
-}
-
#include "common/compile_check_end.h"
} // namespace doris::cloud
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index b8749bbcadd..30b5ecd581c 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -71,8 +71,6 @@ public:
Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>*
tablet_meta);
- Status get_schema_dict(int64_t index_id,
std::shared_ptr<SchemaCloudDictionary>* schema_dict);
-
Status sync_tablet_rowsets(CloudTablet* tablet, const SyncOptions& options
= {},
SyncRowsetStats* sync_stats = nullptr);
Status sync_tablet_rowsets_unlocked(
diff --git a/be/src/cloud/cloud_rowset_writer.cpp
b/be/src/cloud/cloud_rowset_writer.cpp
index ebc411697ee..fcf6115907e 100644
--- a/be/src/cloud/cloud_rowset_writer.cpp
+++ b/be/src/cloud/cloud_rowset_writer.cpp
@@ -100,10 +100,7 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->set_rowset_state(COMMITTED);
}
- // update rowset meta tablet schema if tablet schema updated
- auto rowset_schema = _context.merged_tablet_schema != nullptr ?
_context.merged_tablet_schema
- :
_context.tablet_schema;
- _rowset_meta->set_tablet_schema(rowset_schema);
+ _rowset_meta->set_tablet_schema(_context.tablet_schema);
if (_rowset_meta->newest_write_timestamp() == -1) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
@@ -115,7 +112,7 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
} else {
_rowset_meta->add_segments_file_size(seg_file_size.value());
}
- if (rowset_schema->has_inverted_index()) {
+ if (_context.tablet_schema->has_inverted_index()) {
if (auto idx_files_info =
_idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none
presents: "
@@ -125,9 +122,10 @@ Status CloudRowsetWriter::build(RowsetSharedPtr& rowset) {
}
}
- RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema,
_context.tablet_path,
- _rowset_meta,
&rowset),
- "rowset init failed when build new rowset");
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ RowsetFactory::create_rowset(_context.tablet_schema,
_context.tablet_path, _rowset_meta,
+ &rowset),
+ "rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
}
diff --git a/be/src/cloud/cloud_storage_engine.cpp
b/be/src/cloud/cloud_storage_engine.cpp
index 266205287cf..be103aa3bbc 100644
--- a/be/src/cloud/cloud_storage_engine.cpp
+++ b/be/src/cloud/cloud_storage_engine.cpp
@@ -221,9 +221,6 @@ Status CloudStorageEngine::open() {
_tablet_hotspot = std::make_unique<TabletHotspot>();
- _schema_cloud_dictionary_cache =
-
std::make_unique<SchemaCloudDictionaryCache>(config::schema_dict_cache_capacity);
-
_cloud_snapshot_mgr = std::make_unique<CloudSnapshotMgr>(*this);
RETURN_NOT_OK_STATUS_WITH_WARN(
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
index 69cfb696950..f513bfbdcd7 100644
--- a/be/src/cloud/cloud_storage_engine.h
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -26,7 +26,6 @@
#include "cloud/cloud_cumulative_compaction_policy.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
-#include "cloud/schema_cloud_dictionary_cache.h"
#include "cloud_txn_delete_bitmap_cache.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/compaction.h"
@@ -75,9 +74,7 @@ public:
CloudSnapshotMgr& cloud_snapshot_mgr() { return *_cloud_snapshot_mgr; }
CloudTxnDeleteBitmapCache& txn_delete_bitmap_cache() const { return
*_txn_delete_bitmap_cache; }
- SchemaCloudDictionaryCache& get_schema_cloud_dictionary_cache() {
- return *_schema_cloud_dictionary_cache;
- }
+
ThreadPool& calc_tablet_delete_bitmap_task_thread_pool() const {
return *_calc_tablet_delete_bitmap_task_thread_pool;
}
@@ -180,7 +177,6 @@ private:
std::unique_ptr<CloudTabletMgr> _tablet_mgr;
std::unique_ptr<CloudTxnDeleteBitmapCache> _txn_delete_bitmap_cache;
std::unique_ptr<ThreadPool> _calc_tablet_delete_bitmap_task_thread_pool;
- std::unique_ptr<SchemaCloudDictionaryCache> _schema_cloud_dictionary_cache;
// Components for cache warmup
std::unique_ptr<io::FileCacheBlockDownloader> _file_cache_block_downloader;
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index 3c85041ad0e..364d0ad480d 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -152,36 +152,6 @@ Status CloudTablet::capture_rs_readers(const Version&
spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}
-Status CloudTablet::merge_rowsets_schema() {
- // Find the rowset with the max version
- auto max_version_rowset =
- std::max_element(
- _rs_version_map.begin(), _rs_version_map.end(),
- [](const auto& a, const auto& b) {
- return !a.second->tablet_schema()
- ? true
- : (!b.second->tablet_schema()
- ? false
- :
a.second->tablet_schema()->schema_version() <
-
b.second->tablet_schema()
-
->schema_version());
- })
- ->second;
- TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
- // If the schema has variant columns, perform a merge to create a wide
tablet schema
- if (max_version_schema->num_variant_columns() > 0) {
- std::vector<TabletSchemaSPtr> schemas;
- std::transform(_rs_version_map.begin(), _rs_version_map.end(),
std::back_inserter(schemas),
- [](const auto& rs_meta) { return
rs_meta.second->tablet_schema(); });
- // Merge the collected schemas to obtain the least common schema
-
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas,
nullptr,
-
max_version_schema));
- VLOG_DEBUG << "dump schema: " <<
max_version_schema->dump_full_schema();
- _merged_tablet_schema = max_version_schema;
- }
- return Status::OK();
-}
-
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't
find this tablet in MS.
Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats*
stats) {
@@ -262,11 +232,6 @@ Status CloudTablet::sync_if_not_running(SyncRowsetStats*
stats) {
return st;
}
-TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
- std::shared_lock rlock(_meta_lock);
- return _merged_tablet_schema;
-}
-
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool
version_overlap,
std::unique_lock<std::shared_mutex>& meta_lock,
bool warmup_delta_data) {
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index e7830fab105..69a086e9ba9 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -258,9 +258,6 @@ public:
const auto& rowset_map() const { return _rs_version_map; }
- // Merge all rowset schemas within a CloudTablet
- Status merge_rowsets_schema();
-
int64_t last_sync_time_s = 0;
int64_t last_load_time_ms = 0;
int64_t last_base_compaction_success_time_ms = 0;
@@ -272,9 +269,6 @@ public:
std::atomic<int64_t> remote_read_time_us = 0;
std::atomic<int64_t> exec_compaction_time_us = 0;
- // Return merged extended schema
- TabletSchemaSPtr merged_tablet_schema() const override;
-
void build_tablet_report_info(TTabletInfo* tablet_info);
// check that if the delete bitmap in delete bitmap cache has the same
cardinality with the expected_delete_bitmap's
@@ -352,9 +346,6 @@ private:
// signatures being executed concurrently, we use _rowset_update_lock to
serialize them
mutable std::mutex _rowset_update_lock;
- // Schema will be merged from all rowsets when sync_rowsets
- TabletSchemaSPtr _merged_tablet_schema;
-
// unused_rowsets, [start_version, end_version]
std::mutex _gc_mutex;
std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
diff --git a/be/src/cloud/schema_cloud_dictionary_cache.cpp
b/be/src/cloud/schema_cloud_dictionary_cache.cpp
deleted file mode 100644
index 9fdde420ecb..00000000000
--- a/be/src/cloud/schema_cloud_dictionary_cache.cpp
+++ /dev/null
@@ -1,252 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "cloud/schema_cloud_dictionary_cache.h"
-
-#include <fmt/core.h>
-#include <gen_cpp/olap_file.pb.h>
-#include <vec/common/schema_util.h>
-
-#include <functional>
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-
-#include "cloud/cloud_meta_mgr.h"
-#include "cloud/cloud_storage_engine.h"
-#include "cloud/cloud_tablet.h"
-#include "common/config.h"
-#include "gen_cpp/cloud.pb.h" // For GetSchemaDictResponse
-#include "runtime/exec_env.h"
-
-namespace doris {
-
-bvar::Adder<int64_t> g_schema_dict_cache_count("schema_dict_cache_count");
-bvar::Adder<int64_t> g_replace_dict_keys_to_schema_hit_cache(
- "schema_dict_cache_replace_dict_keys_to_schema_hit_count");
-bvar::Adder<int64_t> g_replace_schema_to_dict_keys_hit_cache(
- "schema_dict_cache_replace_schema_to_dict_keys_hit_count");
-bvar::Adder<int64_t>
g_schema_dict_cache_miss_count("schema_dict_cache_miss_count");
-bvar::Adder<int64_t> g_schema_dict_refresh_count("schema_dict_refresh_count");
-
-void SchemaCloudDictionaryCache::_insert(int64_t index_id, const
SchemaCloudDictionarySPtr& dict) {
- auto* value = new CacheValue;
- value->dict = dict;
- auto* lru_handle =
- LRUCachePolicy::insert(fmt::format("{}", index_id), value, 1, 0,
CachePriority::NORMAL);
- g_schema_dict_cache_count << 1;
- _cache->release(lru_handle);
-}
-
-SchemaCloudDictionarySPtr SchemaCloudDictionaryCache::_lookup(int64_t
index_id) {
- Cache::Handle* handle = LRUCachePolicy::lookup(fmt::format("{}",
index_id));
- if (!handle) {
- return nullptr;
- }
- auto* cache_val = static_cast<CacheValue*>(_cache->value(handle));
- SchemaCloudDictionarySPtr dict = cache_val ? cache_val->dict : nullptr;
- _cache->release(handle); // release handle but dict's shared_ptr still
alive
- return dict;
-}
-
-Status check_path_amibigus(const SchemaCloudDictionary& schema,
RowsetMetaCloudPB* rowset_meta) {
- // if enable_variant_flatten_nested is false, then we don't need to check
path amibigus
- if (!rowset_meta->tablet_schema().enable_variant_flatten_nested()) {
- return Status::OK();
- }
- // try to get all the paths in the rowset meta
- vectorized::PathsInData all_paths;
- for (const auto& column : rowset_meta->tablet_schema().column()) {
- vectorized::PathInData path_in_data;
- path_in_data.from_protobuf(column.column_path_info());
- all_paths.push_back(path_in_data);
- }
- // try to get all the paths in the schema dict
- for (const auto& [_, column] : schema.column_dict()) {
- vectorized::PathInData path_in_data;
- path_in_data.from_protobuf(column.column_path_info());
- all_paths.push_back(path_in_data);
- }
-
RETURN_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(all_paths));
- return Status::OK();
-}
-/**
- * Processes dictionary entries by matching items from the given item map.
- * It maps items to their dictionary keys, then adds these keys to the rowset
metadata.
- * If an item is missing in the dictionary, the dictionary key list in rowset
meta is cleared
- * and the function returns a NotFound status.
- *
- * @tparam ItemPB The protobuf message type for dictionary items (e.g.,
ColumnPB or TabletIndexPB).
- * @param dict The SchemaCloudDictionary that holds the dictionary entries.
- * @param item_dict A mapping from unique identifiers to the dictionary items.
- * @param result Pointer to a repeated field where filtered (non-extended)
items are stored. May be null.
- * @param items The repeated field of items in the original rowset meta.
- * @param filter A predicate that returns true if an item should be treated as
an extended item and skipped.
- * @param add_dict_key_fn A function to be called for each valid item that
adds its key to the rowset meta.
- * @param rowset_meta Pointer to the rowset metadata; it is cleared if any
item is not found.
- *
- * @return Status::OK if all items are processed successfully; otherwise, a
NotFound status.
- */
-template <typename ItemPB>
-Status process_dictionary(SchemaCloudDictionary& dict,
- const google::protobuf::Map<int32_t, ItemPB>&
item_dict,
- google::protobuf::RepeatedPtrField<ItemPB>* result,
- const google::protobuf::RepeatedPtrField<ItemPB>&
items,
- const std::function<bool(const ItemPB&)>& filter,
- const std::function<void(int32_t)>& add_dict_key_fn,
- RowsetMetaCloudPB* rowset_meta) {
- if (items.empty()) {
- return Status::OK();
- }
- // Use deterministic method to do serialization since structure like
- // `google::protobuf::Map`'s serialization is unstable
- auto serialize_fn = [](const ItemPB& item) -> std::string {
- std::string output;
- google::protobuf::io::StringOutputStream string_output_stream(&output);
- google::protobuf::io::CodedOutputStream
output_stream(&string_output_stream);
- output_stream.SetSerializationDeterministic(true);
- item.SerializeToCodedStream(&output_stream);
- return output;
- };
-
- google::protobuf::RepeatedPtrField<ItemPB> none_extracted_items;
- std::unordered_map<std::string, int> reversed_dict;
- for (const auto& [key, val] : item_dict) {
- reversed_dict[serialize_fn(val)] = key;
- }
-
- for (const auto& item : items) {
- if (filter(item)) {
- // Filter none extended items, mainly extended columns and
extended indexes
- *none_extracted_items.Add() = item;
- continue;
- }
- const std::string serialized_key = serialize_fn(item);
- auto it = reversed_dict.find(serialized_key);
- if (it == reversed_dict.end()) {
- // If any required item is missing in the dictionary, clear the
dict key list and return NotFound.
- // ATTN: need to clear dict key list let MS to add key list
- rowset_meta->clear_schema_dict_key_list();
- g_schema_dict_cache_miss_count << 1;
- return Status::NotFound<false>("Not found entry in dict");
- }
- // Add existed dict key to related dict
- add_dict_key_fn(it->second);
- }
- // clear extended items to prevent writing them to fdb
- if (result != nullptr) {
- result->Swap(&none_extracted_items);
- }
- return Status::OK();
-}
-
-Status SchemaCloudDictionaryCache::replace_schema_to_dict_keys(int64_t
index_id,
-
RowsetMetaCloudPB* rowset_meta) {
- if (!rowset_meta->has_variant_type_in_schema()) {
- return Status::OK();
- }
- // first attempt to get dict from cache
- auto dict = _lookup(index_id);
- if (!dict) {
- // if not found the dict in cache, then refresh the dict from remote
meta service
- RETURN_IF_ERROR(refresh_dict(index_id, &dict));
- }
- // here we should have the dict
- DCHECK(dict);
- RETURN_IF_ERROR(check_path_amibigus(*dict, rowset_meta));
- auto* dict_list = rowset_meta->mutable_schema_dict_key_list();
- // Process column dictionary: add keys for non-extended columns.
- auto column_filter = [&](const doris::ColumnPB& col) -> bool { return
col.unique_id() >= 0; };
- auto column_dict_adder = [&](int32_t key) {
dict_list->add_column_dict_key_list(key); };
- RETURN_IF_ERROR(process_dictionary<ColumnPB>(
- *dict, dict->column_dict(),
rowset_meta->mutable_tablet_schema()->mutable_column(),
- rowset_meta->tablet_schema().column(), column_filter,
column_dict_adder, rowset_meta));
-
- // Process index dictionary: add keys for indexes with an empty
index_suffix_name.
- auto index_filter = [&](const doris::TabletIndexPB& index_pb) -> bool {
- return index_pb.index_suffix_name().empty();
- };
- auto index_dict_adder = [&](int32_t key) {
dict_list->add_index_info_dict_key_list(key); };
- RETURN_IF_ERROR(process_dictionary<doris::TabletIndexPB>(
- *dict, dict->index_dict(),
rowset_meta->mutable_tablet_schema()->mutable_index(),
- rowset_meta->tablet_schema().index(), index_filter,
index_dict_adder, rowset_meta));
- g_replace_schema_to_dict_keys_hit_cache << 1;
- return Status::OK();
-}
-
-Status SchemaCloudDictionaryCache::_try_fill_schema(
- const std::shared_ptr<SchemaCloudDictionary>& dict, const
SchemaDictKeyList& dict_keys,
- TabletSchemaCloudPB* schema) {
- // Process column dictionary keys
- for (int key : dict_keys.column_dict_key_list()) {
- auto it = dict->column_dict().find(key);
- if (it == dict->column_dict().end()) {
- return Status::NotFound<false>("Column dict key {} not found",
key);
- }
- *schema->add_column() = it->second;
- }
- // Process index dictionary keys
- for (int key : dict_keys.index_info_dict_key_list()) {
- auto it = dict->index_dict().find(key);
- if (it == dict->index_dict().end()) {
- return Status::NotFound<false>("Index dict key {} not found", key);
- }
- *schema->add_index() = it->second;
- }
- return Status::OK();
-}
-
-Status SchemaCloudDictionaryCache::refresh_dict(int64_t index_id,
- SchemaCloudDictionarySPtr*
new_dict) {
- // First attempt: use the current cached dictionary.
- auto refresh_dict = std::make_shared<SchemaCloudDictionary>();
- RETURN_IF_ERROR(static_cast<const
CloudStorageEngine&>(ExecEnv::GetInstance()->storage_engine())
- .meta_mgr()
- .get_schema_dict(index_id, &refresh_dict));
- _insert(index_id, refresh_dict);
- if (new_dict != nullptr) {
- *new_dict = refresh_dict;
- }
- LOG(INFO) << "refresh dict for index_id=" << index_id;
- g_schema_dict_refresh_count << 1;
- return Status::OK();
-}
-
-Status SchemaCloudDictionaryCache::replace_dict_keys_to_schema(int64_t
index_id,
-
RowsetMetaCloudPB* out) {
- // First attempt: use the current cached dictionary
- SchemaCloudDictionarySPtr dict = _lookup(index_id);
- Status st =
- dict ? _try_fill_schema(dict, out->schema_dict_key_list(),
out->mutable_tablet_schema())
- : Status::NotFound<false>("Schema dict not found in cache");
-
- // If filling fails (possibly due to outdated dictionary data), refresh
the dictionary
- if (!st.ok()) {
- g_schema_dict_cache_miss_count << 1;
- RETURN_IF_ERROR(refresh_dict(index_id, &dict));
- if (!dict) {
- return Status::NotFound<false>("Schema dict not found after
refresh, index_id={}",
- index_id);
- }
- // Retry filling the schema with the refreshed dictionary
- st = _try_fill_schema(dict, out->schema_dict_key_list(),
out->mutable_tablet_schema());
- }
- g_replace_dict_keys_to_schema_hit_cache << 1;
- return st;
-}
-
-} // namespace doris
diff --git a/be/src/cloud/schema_cloud_dictionary_cache.h
b/be/src/cloud/schema_cloud_dictionary_cache.h
deleted file mode 100644
index ed21b7db909..00000000000
--- a/be/src/cloud/schema_cloud_dictionary_cache.h
+++ /dev/null
@@ -1,101 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <gen_cpp/olap_file.pb.h>
-
-#include <memory>
-#include <string>
-#include <utility>
-
-#include "runtime/memory/lru_cache_policy.h"
-
-namespace doris {
-
-class SchemaCloudDictionary;
-class RowsetMetaCloudPB;
-
-using SchemaCloudDictionarySPtr = std::shared_ptr<SchemaCloudDictionary>;
-
-/*
- * SchemaCloudDictionaryCache provides a local cache for SchemaCloudDictionary.
- *
- * Caching logic:
- * - If the dictionary associated with a given key has not had any new
columns added
- * (determined by comparing the serialized data for consistency),
- * the cached dictionary is directly used to update the dictionary list in
the rowset meta
- * (similar to the process_dictionary logic in write_schema_dict).
- * - If new columns have been detected, the local cache is disregarded, and
the updated
- * dictionary should be fetched via the meta service.
- */
-class SchemaCloudDictionaryCache : public LRUCachePolicy {
-public:
- SchemaCloudDictionaryCache(size_t capacity)
- :
LRUCachePolicy(CachePolicy::CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE, capacity,
- LRUCacheType::NUMBER, 512) {}
- /**
- * Refreshes the dictionary for the given index_id by calling an RPC via
the meta manager.
- * The refreshed dictionary is then inserted into the cache.
- *
- * @param index_id The identifier for the index.
- * @param new_dict Optional output parameter; if provided, it will be set
to point to the refreshed dictionary.
- *
- * @return Status::OK if the dictionary is successfully refreshed;
otherwise, an error status.
- */
- virtual Status refresh_dict(int64_t index_id, SchemaCloudDictionarySPtr*
new_dict = nullptr);
-
- /**
- * Refreshes the dictionary for the given index_id by calling an RPC via
the meta manager.
- * The refreshed dictionary is then inserted into the cache.
- *
- * @param index_id The identifier for the index.
- * @param new_dict Optional output parameter; if provided, it will be set
to point to the refreshed dictionary.
- *
- * @return Status::OK if the dictionary is successfully refreshed;
otherwise, an error status.
- */
- Status replace_schema_to_dict_keys(int64_t index_id, RowsetMetaCloudPB*
out);
-
- /**
- * Replaces dictionary keys in the given RowsetMetaCloudPB by using the
cached dictionary.
- * If the cached dictionary is missing or its data is outdated (i.e.
missing required keys),
- * an RPC call is triggered to refresh the dictionary, which is then used
to fill the tablet schema.
- *
- * @param index_id The identifier for the index.
- * @param out Pointer to the RowsetMetaCloudPB whose tablet schema will be
updated.
- *
- * @return Status::OK if the tablet schema is successfully updated;
otherwise, an error status.
- */
- Status replace_dict_keys_to_schema(int64_t index_id, RowsetMetaCloudPB*
out);
-
-private:
- // ut
- friend class FakeSchemaCloudDictionaryCache;
- // insert dict
- void _insert(int64_t index_id, const SchemaCloudDictionarySPtr& dict);
- // lookup dict
- SchemaCloudDictionarySPtr _lookup(int64_t index_id);
- // Attempts to fill the tablet schema information in a
SchemaCloudDictionary into a TabletSchemaCloudPB
- // based on a given set of dictionary keys. If any required key is missing
in the dictionary, a NotFound status is returned.
- Status _try_fill_schema(const SchemaCloudDictionarySPtr& dict,
- const SchemaDictKeyList& dict_keys,
TabletSchemaCloudPB* schema);
- struct CacheValue : public LRUCacheValueBase {
- SchemaCloudDictionarySPtr dict;
- };
-};
-
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index de4ad4a94b4..1d424cbb023 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -158,17 +158,7 @@ TabletSchemaSPtr
BaseTablet::tablet_schema_with_merged_max_schema_version(
:
a->tablet_schema()->schema_version() <
b->tablet_schema()->schema_version());
});
- TabletSchemaSPtr target_schema = max_schema_version_rs->tablet_schema();
- if (target_schema->num_variant_columns() > 0) {
- // For variant columns tablet schema need to be the merged wide tablet
schema
- std::vector<TabletSchemaSPtr> schemas;
- std::transform(rowset_metas.begin(), rowset_metas.end(),
std::back_inserter(schemas),
- [](const RowsetMetaSharedPtr& rs_meta) { return
rs_meta->tablet_schema(); });
- static_cast<void>(
- vectorized::schema_util::get_least_common_schema(schemas,
nullptr, target_schema));
- VLOG_DEBUG << "dump schema: " << target_schema->dump_full_schema();
- }
- return target_schema;
+ return max_schema_version_rs->tablet_schema();
}
Status BaseTablet::set_tablet_state(TabletState state) {
@@ -189,21 +179,6 @@ void BaseTablet::update_max_version_schema(const
TabletSchemaSPtr& tablet_schema
}
}
-Status BaseTablet::update_by_least_common_schema(const TabletSchemaSPtr&
update_schema) {
- std::lock_guard wrlock(_meta_lock);
- CHECK(_max_version_schema->schema_version() >=
update_schema->schema_version());
- TabletSchemaSPtr final_schema;
- bool check_column_size = true;
- VLOG_DEBUG << "dump _max_version_schema: " <<
_max_version_schema->dump_full_schema();
- VLOG_DEBUG << "dump update_schema: " << update_schema->dump_full_schema();
- RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
- {_max_version_schema, update_schema}, _max_version_schema,
final_schema,
- check_column_size));
- _max_version_schema = final_schema;
- VLOG_DEBUG << "dump updated tablet schema: " <<
final_schema->dump_full_schema();
- return Status::OK();
-}
-
uint32_t BaseTablet::get_real_compaction_score() const {
std::shared_lock l(_meta_lock);
const auto& rs_metas = _tablet_meta->all_rs_metas();
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index e1c469cb97a..4267d182cfc 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -93,8 +93,6 @@ public:
void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
- Status update_by_least_common_schema(const TabletSchemaSPtr&
update_schema);
-
TabletSchemaSPtr tablet_schema() const {
std::shared_lock rlock(_meta_lock);
return _max_version_schema;
@@ -296,12 +294,6 @@ public:
const std::vector<RowsetSharedPtr>&
candidate_rowsets,
int64_t limit);
- // Return the merged schema of all rowsets
- virtual TabletSchemaSPtr merged_tablet_schema() const {
- std::shared_lock rlock(_meta_lock);
- return _max_version_schema;
- }
-
void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
bool include_stale = false) {
std::shared_lock rlock(_meta_lock);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 5a7455df362..4794f143460 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -839,13 +839,10 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
_rowset_meta->set_newest_write_timestamp(UnixSeconds());
}
- // update rowset meta tablet schema if tablet schema updated
- auto rowset_schema = _context.merged_tablet_schema != nullptr ?
_context.merged_tablet_schema
- :
_context.tablet_schema;
- _rowset_meta->set_tablet_schema(rowset_schema);
+ _rowset_meta->set_tablet_schema(_context.tablet_schema);
// If segment compaction occurs, the idx file info will become inaccurate.
- if (rowset_schema->has_inverted_index() && _num_segcompacted == 0) {
+ if (_context.tablet_schema->has_inverted_index() && _num_segcompacted ==
0) {
if (auto idx_files_info =
_idx_files.inverted_index_file_info(_segment_start_id);
!idx_files_info.has_value()) [[unlikely]] {
LOG(ERROR) << "expected inverted index files info, but none
presents: "
@@ -855,9 +852,10 @@ Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) {
}
}
- RETURN_NOT_OK_STATUS_WITH_WARN(RowsetFactory::create_rowset(rowset_schema,
_context.tablet_path,
- _rowset_meta,
&rowset),
- "rowset init failed when build new rowset");
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ RowsetFactory::create_rowset(_context.tablet_schema,
_context.tablet_path, _rowset_meta,
+ &rowset),
+ "rowset init failed when build new rowset");
_already_built = true;
return Status::OK();
}
@@ -870,28 +868,6 @@ int64_t BetaRowsetWriter::_num_seg() const {
return is_segcompacted() ? _num_segcompacted : _num_segment;
}
-// update tablet schema when meet variant columns, before commit_txn
-// Eg. rowset schema: A(int), B(float), C(int), D(int)
-// _tabelt->tablet_schema: A(bigint), B(double)
-// => update_schema: A(bigint), B(double), C(int), D(int)
-Status BaseBetaRowsetWriter::update_rowset_schema(TabletSchemaSPtr
flush_schema) {
- std::lock_guard<std::mutex> lock(*(_context.schema_lock));
- TabletSchemaSPtr update_schema;
- if (_context.merged_tablet_schema == nullptr) {
- _context.merged_tablet_schema = _context.tablet_schema;
- }
- RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(
- {_context.merged_tablet_schema, flush_schema}, nullptr,
update_schema));
- CHECK_GE(update_schema->num_columns(), flush_schema->num_columns())
- << "Rowset merge schema columns count is " <<
update_schema->num_columns()
- << ", but flush_schema is larger " << flush_schema->num_columns()
- << " update_schema: " << update_schema->dump_structure()
- << " flush_schema: " << flush_schema->dump_structure();
- _context.merged_tablet_schema.swap(update_schema);
- VLOG_DEBUG << "dump rs schema: " <<
_context.tablet_schema->dump_structure();
- return Status::OK();
-}
-
Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool
check_segment_num) {
int64_t num_rows_written = 0;
int64_t total_data_size = 0;
@@ -1082,8 +1058,7 @@ Status
BetaRowsetWriter::_check_segment_number_limit(size_t segnum) {
return Status::OK();
}
-Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) {
+Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat) {
uint32_t segid_offset = segment_id - _segment_start_id;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
@@ -1105,10 +1080,7 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t
segment_id, const SegmentStati
_num_segment++;
}
}
- // tablet schema updated
- if (flush_schema != nullptr) {
- RETURN_IF_ERROR(update_rowset_schema(flush_schema));
- }
+
if (_context.mow_context != nullptr) {
// ensure that the segment file writing is complete
auto* file_writer = _seg_files.get(segment_id);
@@ -1120,10 +1092,8 @@ Status BaseBetaRowsetWriter::add_segment(uint32_t
segment_id, const SegmentStati
return Status::OK();
}
-Status BetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) {
- RETURN_IF_ERROR(
- BaseBetaRowsetWriter::add_segment(segment_id, segstat,
std::move(flush_schema)));
+Status BetaRowsetWriter::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat) {
+ RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat));
return _segcompaction_if_necessary();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 9cb72896f74..8429e30f4c3 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -131,8 +131,7 @@ public:
Status create_index_file_writer(uint32_t segment_id, IndexFileWriterPtr*
writer) override;
- Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) override;
+ Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat)
override;
Status flush() override;
@@ -202,7 +201,6 @@ public:
}
private:
- Status update_rowset_schema(TabletSchemaSPtr flush_schema);
// build a tmp rowset for load segment to calc delete_bitmap
// for this segment
protected:
@@ -280,8 +278,7 @@ public:
Status init(const RowsetWriterContext& rowset_writer_context) override;
- Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) override;
+ Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat)
override;
Status flush_segment_writer_for_segcompaction(
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
index_size,
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
index 3a2059afb95..92092110bea 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.cpp
@@ -79,12 +79,11 @@ Status BetaRowsetWriterV2::create_file_writer(uint32_t
segment_id, io::FileWrite
return Status::OK();
}
-Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) {
+Status BetaRowsetWriterV2::add_segment(uint32_t segment_id, const
SegmentStatistics& segstat) {
bool ok = false;
for (const auto& stream : _streams) {
auto st = stream->add_segment(_context.partition_id,
_context.index_id, _context.tablet_id,
- segment_id, segstat, flush_schema);
+ segment_id, segstat);
if (!st.ok()) {
LOG(WARNING) << "failed to add segment " << segment_id << " to
stream "
<< stream->stream_id();
diff --git a/be/src/olap/rowset/beta_rowset_writer_v2.h
b/be/src/olap/rowset/beta_rowset_writer_v2.h
index a5d367b00b9..69504f5dc1d 100644
--- a/be/src/olap/rowset/beta_rowset_writer_v2.h
+++ b/be/src/olap/rowset/beta_rowset_writer_v2.h
@@ -112,8 +112,7 @@ public:
return Status::OK();
}
- Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat,
- TabletSchemaSPtr flush_schema) override;
+ Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat)
override;
int32_t allocate_segment_id() override { return
_segment_creator.allocate_segment_id(); };
diff --git a/be/src/olap/rowset/rowset_writer.h
b/be/src/olap/rowset/rowset_writer.h
index 060b32b9640..8809b34f87a 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -142,8 +142,7 @@ public:
"RowsetWriter not support flush_single_block");
}
- virtual Status add_segment(uint32_t segment_id, const SegmentStatistics&
segstat,
- TabletSchemaSPtr flush_schema) {
+ virtual Status add_segment(uint32_t segment_id, const SegmentStatistics&
segstat) {
return Status::NotSupported("RowsetWriter does not support
add_segment");
}
diff --git a/be/src/olap/rowset/rowset_writer_context.h
b/be/src/olap/rowset/rowset_writer_context.h
index 68cf689907e..97984b1fa15 100644
--- a/be/src/olap/rowset/rowset_writer_context.h
+++ b/be/src/olap/rowset/rowset_writer_context.h
@@ -51,8 +51,6 @@ struct RowsetWriterContext {
RowsetTypePB rowset_type {BETA_ROWSET};
TabletSchemaSPtr tablet_schema;
- // for variant schema update
- TabletSchemaSPtr merged_tablet_schema;
// PREPARED/COMMITTED for pending rowset
// VISIBLE for non-pending rowset
RowsetStatePB rowset_state {PREPARED};
diff --git a/be/src/olap/rowset/segment_creator.cpp
b/be/src/olap/rowset/segment_creator.cpp
index ab162dff872..08d331d9559 100644
--- a/be/src/olap/rowset/segment_creator.cpp
+++ b/be/src/olap/rowset/segment_creator.cpp
@@ -74,12 +74,12 @@ Status SegmentFlusher::flush_single_block(const
vectorized::Block* block, int32_
std::unique_ptr<segment_v2::VerticalSegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id,
no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0,
flush_block.rows()));
- RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(),
flush_size));
+ RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
} else {
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(writer, segment_id,
no_compression));
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_add_rows(writer, &flush_block, 0,
flush_block.rows()));
- RETURN_IF_ERROR(_flush_segment_writer(writer, writer->flush_schema(),
flush_size));
+ RETURN_IF_ERROR(_flush_segment_writer(writer, flush_size));
}
return Status::OK();
}
@@ -207,8 +207,7 @@ Status SegmentFlusher::_create_segment_writer(
}
Status SegmentFlusher::_flush_segment_writer(
- std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
TabletSchemaSPtr flush_schema,
- int64_t* flush_size) {
+ std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer, int64_t*
flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
@@ -253,7 +252,7 @@ Status SegmentFlusher::_flush_segment_writer(
writer.reset();
- RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat,
flush_schema));
+ RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
if (flush_size) {
*flush_size = segment_file_size;
@@ -262,7 +261,7 @@ Status SegmentFlusher::_flush_segment_writer(
}
Status
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
- TabletSchemaSPtr flush_schema,
int64_t* flush_size) {
+ int64_t* flush_size) {
uint32_t row_num = writer->num_rows_written();
_num_rows_updated += writer->num_rows_updated();
_num_rows_deleted += writer->num_rows_deleted();
@@ -306,7 +305,7 @@ Status
SegmentFlusher::_flush_segment_writer(std::unique_ptr<segment_v2::Segment
writer.reset();
- RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat,
flush_schema));
+ RETURN_IF_ERROR(_context.segment_collector->add(segment_id, segstat));
if (flush_size) {
*flush_size = segment_file_size;
diff --git a/be/src/olap/rowset/segment_creator.h
b/be/src/olap/rowset/segment_creator.h
index 6fc4df5c5d2..19303e225f8 100644
--- a/be/src/olap/rowset/segment_creator.h
+++ b/be/src/olap/rowset/segment_creator.h
@@ -75,8 +75,7 @@ class SegmentCollector {
public:
virtual ~SegmentCollector() = default;
- virtual Status add(uint32_t segment_id, SegmentStatistics& segstat,
- TabletSchemaSPtr flush_chema) = 0;
+ virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0;
};
template <class T>
@@ -85,9 +84,8 @@ class SegmentCollectorT : public SegmentCollector {
public:
explicit SegmentCollectorT(T* t) : _t(t) {}
- Status add(uint32_t segment_id, SegmentStatistics& segstat,
- TabletSchemaSPtr flush_chema) override {
- return _t->add_segment(segment_id, segstat, flush_chema);
+ Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
+ return _t->add_segment(segment_id, segstat);
}
private:
@@ -155,10 +153,8 @@ private:
Status
_create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
writer,
int32_t segment_id, bool no_compression =
false);
Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>&
writer,
- TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
Status
_flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>&
writer,
- TabletSchemaSPtr flush_schema = nullptr,
int64_t* flush_size = nullptr);
private:
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h
b/be/src/olap/rowset/segment_v2/segment_writer.h
index c58ee417864..cd6d838b987 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -140,8 +140,6 @@ public:
void clear();
- TabletSchemaSPtr flush_schema() const { return _flush_schema; };
-
void set_mow_context(std::shared_ptr<MowContext> mow_context);
Status close_inverted_index(int64_t* inverted_index_file_size) {
@@ -260,8 +258,6 @@ private:
std::shared_ptr<MowContext> _mow_context;
// group every rowset-segment row id to speed up reader
std::map<RowsetId, RowsetSharedPtr> _rsid_to_rowset;
- // contains auto generated columns, should be nullptr if no variants's
subcolumns
- TabletSchemaSPtr _flush_schema = nullptr;
std::vector<std::string> _primary_keys;
uint64_t _primary_keys_size = 0;
// variant statistics calculator for efficient stats collection
diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
index b544eb0f74f..398ac0bb583 100644
--- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h
@@ -117,8 +117,6 @@ public:
Slice min_encoded_key();
Slice max_encoded_key();
- TabletSchemaSPtr flush_schema() const { return _flush_schema; };
-
void clear();
Status close_inverted_index(int64_t* inverted_index_file_size) {
@@ -262,9 +260,6 @@ private:
std::vector<RowsInBlock> _batched_blocks;
- // contains auto generated columns, should be nullptr if no variants's
subcolumns
- TabletSchemaSPtr _flush_schema = nullptr;
-
BlockAggregator _block_aggregator;
};
diff --git a/be/src/olap/rowset_builder.cpp b/be/src/olap/rowset_builder.cpp
index b9391f55a20..0329c6d9191 100644
--- a/be/src/olap/rowset_builder.cpp
+++ b/be/src/olap/rowset_builder.cpp
@@ -335,25 +335,6 @@ Status RowsetBuilder::commit_txn() {
std::lock_guard<std::mutex> l(_lock);
SCOPED_TIMER(_commit_txn_timer);
- const RowsetWriterContext& rw_ctx = _rowset_writer->context();
- if (rw_ctx.tablet_schema->num_variant_columns() > 0 && _rowset->num_rows()
> 0) {
- // Need to merge schema with `rw_ctx.merged_tablet_schema` in prior,
- // merged schema keeps the newest merged schema for the rowset, which
is updated and merged
- // during flushing segments.
- if (rw_ctx.merged_tablet_schema != nullptr) {
-
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.merged_tablet_schema));
- } else {
- // We should merge rowset schema further, in case that the
merged_tablet_schema maybe null
- // when enable_memtable_on_sink_node is true, the
merged_tablet_schema will not be passed to
- // the destination backend.
- // update tablet schema when meet variant columns, before
commit_txn
- // Eg. rowset schema: A(int), B(float), C(int), D(int)
- // _tabelt->tablet_schema: A(bigint), B(double)
- // => update_schema: A(bigint), B(double), C(int), D(int)
-
RETURN_IF_ERROR(tablet()->update_by_least_common_schema(rw_ctx.tablet_schema));
- }
- }
-
// Transfer ownership of `PendingRowsetGuard` to `TxnManager`
Status res = _engine.txn_manager()->commit_txn(
_req.partition_id, *tablet(), _req.txn_id, _req.load_id, _rowset,
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 87776523905..d042e4ea752 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -221,11 +221,6 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
SCOPED_TIMER(_add_segment_timer);
DCHECK(header.has_segment_statistics());
SegmentStatistics stat(header.segment_statistics());
- TabletSchemaSPtr flush_schema;
- if (header.has_flush_schema()) {
- flush_schema = std::make_shared<TabletSchema>();
- flush_schema->init_from_pb(header.flush_schema());
- }
int64_t src_id = header.src_id();
uint32_t segid = header.segment_id();
@@ -252,9 +247,9 @@ Status TabletStream::add_segment(const PStreamHeader&
header, butil::IOBuf* data
}
DCHECK(new_segid != std::numeric_limits<uint32_t>::max());
- auto add_segment_func = [this, new_segid, stat, flush_schema]() {
+ auto add_segment_func = [this, new_segid, stat]() {
signal::set_signal_task_id(_load_id);
- auto st = _load_stream_writer->add_segment(new_segid, stat,
flush_schema);
+ auto st = _load_stream_writer->add_segment(new_segid, stat);
DBUG_EXECUTE_IF("TabletStream.add_segment.add_segment_failed",
{ st = Status::InternalError("fault injection"); });
if (!st.ok()) {
diff --git a/be/src/runtime/load_stream_writer.cpp
b/be/src/runtime/load_stream_writer.cpp
index 666c9c57fa9..5a77ee283c3 100644
--- a/be/src/runtime/load_stream_writer.cpp
+++ b/be/src/runtime/load_stream_writer.cpp
@@ -182,8 +182,7 @@ Status LoadStreamWriter::close_writer(uint32_t segid,
FileType file_type) {
return Status::OK();
}
-Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics&
stat,
- TabletSchemaSPtr flush_schema) {
+Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics&
stat) {
SCOPED_ATTACH_TASK(_resource_ctx);
size_t segment_file_size = 0;
size_t inverted_file_size = 0;
@@ -209,7 +208,7 @@ Status LoadStreamWriter::add_segment(uint32_t segid, const
SegmentStatistics& st
segid, segment_file_size, inverted_file_size, stat.data_size,
_req.tablet_id);
}
- return _rowset_writer->add_segment(segid, stat, flush_schema);
+ return _rowset_writer->add_segment(segid, stat);
}
Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type,
size_t* file_size) {
diff --git a/be/src/runtime/load_stream_writer.h
b/be/src/runtime/load_stream_writer.h
index c85d0c48d42..e27db946231 100644
--- a/be/src/runtime/load_stream_writer.h
+++ b/be/src/runtime/load_stream_writer.h
@@ -60,7 +60,7 @@ public:
Status close_writer(uint32_t segid, FileType file_type);
- Status add_segment(uint32_t segid, const SegmentStatistics& stat,
TabletSchemaSPtr flush_chema);
+ Status add_segment(uint32_t segid, const SegmentStatistics& stat);
Status pre_close() {
std::lock_guard<std::mutex> l(_lock);
diff --git a/be/src/runtime/memory/cache_policy.h
b/be/src/runtime/memory/cache_policy.h
index 68f73623c57..64ef2f19e36 100644
--- a/be/src/runtime/memory/cache_policy.h
+++ b/be/src/runtime/memory/cache_policy.h
@@ -102,8 +102,6 @@ public:
return "QueryCache";
case CacheType::TABLET_COLUMN_OBJECT_POOL:
return "TabletColumnObjectPool";
- case CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE:
- return "SchemaCloudDictionaryCache";
default:
throw Exception(Status::FatalError("not match type of cache policy
:{}",
static_cast<int>(type)));
@@ -133,7 +131,6 @@ public:
{"ForUTCacheNumber", CacheType::FOR_UT_CACHE_NUMBER},
{"QueryCache", CacheType::QUERY_CACHE},
{"TabletColumnObjectPool", CacheType::TABLET_COLUMN_OBJECT_POOL},
- {"SchemaCloudDictionaryCache",
CacheType::SCHEMA_CLOUD_DICTIONARY_CACHE},
};
static CacheType string_to_type(std::string type) {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 3732fde5e61..fb971e2a1a2 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1244,10 +1244,11 @@ void
PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
LOG(WARNING) << "tablet does not exist, tablet id is "
<< tablet_id;
continue;
}
- auto schema = res.value()->merged_tablet_schema();
- if (schema != nullptr) {
- tablet_schemas.push_back(schema);
- }
+ // TODO(lihangyu): implement this
+ // auto schema = res.value()->merged_tablet_schema();
+ // if (schema != nullptr) {
+ // tablet_schemas.push_back(schema);
+ // }
}
if (!tablet_schemas.empty()) {
// merge all
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 9acc69e3379..17ae49eb570 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -234,8 +234,7 @@ Status LoadStreamStub::append_data(int64_t partition_id,
int64_t index_id, int64
// ADD_SEGMENT
Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id,
int64_t tablet_id,
- int32_t segment_id, const
SegmentStatistics& segment_stat,
- TabletSchemaSPtr flush_schema) {
+ int32_t segment_id, const
SegmentStatistics& segment_stat) {
if (!_is_open.load()) {
add_failed_tablet(tablet_id, _status);
return _status;
@@ -250,9 +249,6 @@ Status LoadStreamStub::add_segment(int64_t partition_id,
int64_t index_id, int64
header.set_segment_id(segment_id);
header.set_opcode(doris::PStreamHeader::ADD_SEGMENT);
segment_stat.to_pb(header.mutable_segment_statistics());
- if (flush_schema != nullptr) {
- flush_schema->to_schema_pb(header.mutable_flush_schema());
- }
return _encode_and_send(header);
}
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 67791272f38..066805087a6 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -144,8 +144,7 @@ public:
// ADD_SEGMENT
Status add_segment(int64_t partition_id, int64_t index_id, int64_t
tablet_id,
- int32_t segment_id, const SegmentStatistics&
segment_stat,
- TabletSchemaSPtr flush_schema);
+ int32_t segment_id, const SegmentStatistics&
segment_stat);
// CLOSE_LOAD
Status close_load(const std::vector<PTabletID>& tablets_to_commit);
diff --git a/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
b/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
deleted file mode 100644
index 0fc4fd0c3f5..00000000000
--- a/be/test/cloud/test_schema_cloud_dictionary_cache.cpp
+++ /dev/null
@@ -1,232 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "cloud/schema_cloud_dictionary_cache.cpp"
-#include "cloud/schema_cloud_dictionary_cache.h"
-#include "gen_cpp/olap_file.pb.h"
-#include "gtest/gtest.h"
-#include "vec/json/path_in_data.h"
-
-namespace doris {
-
-using SchemaCloudDictionarySPtr = std::shared_ptr<SchemaCloudDictionary>;
-
-/*
- * FakeSchemaCloudDictionaryCache is a test subclass which allows injection of
dictionary entries
- * and overrides refresh_dict to simulate RPC refresh.
- */
-class FakeSchemaCloudDictionaryCache : public SchemaCloudDictionaryCache {
-public:
- FakeSchemaCloudDictionaryCache(size_t capacity) :
SchemaCloudDictionaryCache(capacity) {}
-
- // For unit testing, we override refresh_dict to simulate different
scenarios.
- // (Assume the base class method is declared virtual for testing or we
hide it in our subclass)
- Status refresh_dict(int64_t index_id, SchemaCloudDictionarySPtr* new_dict
= nullptr) override {
- if (simulate_refresh_success) {
- // Simulate a successful refresh by creating a valid dictionary.
- SchemaCloudDictionarySPtr valid_dict = createValidDictionary();
- // Inject the dictionary into cache.
- TestInsert(index_id, valid_dict);
- if (new_dict) {
- *new_dict = valid_dict;
- }
- return Status::OK();
- } else {
- return Status::InternalError("Simulated refresh failure");
- }
- }
-
- // Public wrapper for injection (assume _insert is accessible, e.g.
changed to protected for unit test)
- void TestInsert(int64_t index_id, const SchemaCloudDictionarySPtr& dict) {
- _insert(index_id, dict);
- }
-
- // Flag to control refresh_dict to simulate refresh results.
- bool simulate_refresh_success = true;
-
- // Create a valid SchemaCloudDictionary with expected keys.
- static SchemaCloudDictionarySPtr createValidDictionary() {
- auto* dict = new SchemaCloudDictionary();
- // Populate valid column entry with key 101.
- auto& col_dict = *dict->mutable_column_dict();
- ColumnPB* col_pb = &(col_dict)[101];
- col_pb->set_unique_id(101);
- // Populate valid index entry with key 201. Set index_suffix_name to
empty.
- auto& idx_dict = *dict->mutable_index_dict();
- TabletIndexPB* idx_pb = &(idx_dict)[201];
- idx_pb->set_index_suffix_name("");
- return SchemaCloudDictionarySPtr(dict);
- }
-
- // Create an invalid SchemaCloudDictionary (missing column key 101)
- static SchemaCloudDictionarySPtr createInvalidDictionary() {
- auto* dict = new SchemaCloudDictionary();
- // Insert a column with a wrong key example 999 rather than 101.
- auto& col_dict = *dict->mutable_column_dict();
- ColumnPB* col_pb = &(col_dict)[999];
- col_pb->set_unique_id(999);
- // 正常的 index 数据.
- auto& idx_dict = *dict->mutable_index_dict();
- TabletIndexPB* idx_pb = &(idx_dict)[201];
- idx_pb->set_index_suffix_name("");
- return SchemaCloudDictionarySPtr(dict);
- }
-};
-
-// Test case 1: Cached dictionary valid, _try_fill_schema returns OK.
-TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_ValidCache) {
- int64_t index_id = 100;
- FakeSchemaCloudDictionaryCache cache(10);
- // Inject a valid dictionary into cache.
- SchemaCloudDictionarySPtr valid_dict =
FakeSchemaCloudDictionaryCache::createValidDictionary();
- cache.TestInsert(index_id, valid_dict);
-
- // Create a RowsetMetaCloudPB with schema dictionary key list.
- RowsetMetaCloudPB rs_meta;
- // For testing, add expected column key (101) and index key (201).
- SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
- dict_keys->add_column_dict_key_list(101);
- dict_keys->add_index_info_dict_key_list(201);
- // Ensure tablet schema message is created.
- rs_meta.mutable_tablet_schema();
-
- // Call replace_dict_keys_to_schema.
- Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
- EXPECT_TRUE(st.ok());
-
- // Check that the tablet schema was filled.
- const TabletSchemaCloudPB& schema = rs_meta.tablet_schema();
- EXPECT_EQ(schema.column_size(), 1);
- EXPECT_EQ(schema.index_size(), 1);
-}
-
-// Test case 2: Cached dictionary invalid, triggers refresh then succeeds.
-TEST(SchemaCloudDictionaryCacheTest,
ReplaceDictKeysToSchema_InvalidCache_ThenRefresh) {
- int64_t index_id = 200;
- FakeSchemaCloudDictionaryCache cache(10);
- // Inject an invalid dictionary (missing required column key 101).
- SchemaCloudDictionarySPtr invalid_dict =
- FakeSchemaCloudDictionaryCache::createInvalidDictionary();
- cache.TestInsert(index_id, invalid_dict);
-
- // Create rowset meta with keys expecting valid dictionary.
- RowsetMetaCloudPB rs_meta;
- SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
- dict_keys->add_column_dict_key_list(101); // invalid dict does not contain
101.
- dict_keys->add_index_info_dict_key_list(201);
- rs_meta.mutable_tablet_schema();
-
- cache.simulate_refresh_success = true;
- Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
- EXPECT_TRUE(st.ok());
-
- // After refresh, the valid dictionary should be used.
- const TabletSchemaCloudPB& schema = rs_meta.tablet_schema();
- EXPECT_EQ(schema.column_size(), 1);
- EXPECT_EQ(schema.index_size(), 1);
-}
-
-// Test case 3: No dictionary in cache, refresh is triggered and succeeds.
-TEST(SchemaCloudDictionaryCacheTest,
ReplaceDictKeysToSchema_NoCache_ThenRefresh) {
- int64_t index_id = 300;
- FakeSchemaCloudDictionaryCache cache(10);
- // Not injecting any dictionary so that _lookup returns null.
- RowsetMetaCloudPB rs_meta;
- SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
- dict_keys->add_column_dict_key_list(101);
- dict_keys->add_index_info_dict_key_list(201);
- rs_meta.mutable_tablet_schema();
-
- // Refresh should be triggered.
- cache.simulate_refresh_success = true;
- Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
- EXPECT_TRUE(st.ok());
-
- const TabletSchemaCloudPB& schema = rs_meta.tablet_schema();
- EXPECT_EQ(schema.column_size(), 1);
- EXPECT_EQ(schema.index_size(), 1);
-}
-
-// Test case 4: Refresh fails, replace_dict_keys_to_schema returns error.
-TEST(SchemaCloudDictionaryCacheTest, ReplaceDictKeysToSchema_RefreshFailure) {
- int64_t index_id = 400;
- FakeSchemaCloudDictionaryCache cache(10);
- // Ensure no valid dictionary in cache.
- RowsetMetaCloudPB rs_meta;
- SchemaDictKeyList* dict_keys = rs_meta.mutable_schema_dict_key_list();
- dict_keys->add_column_dict_key_list(101);
- dict_keys->add_index_info_dict_key_list(201);
- rs_meta.mutable_tablet_schema();
-
- cache.simulate_refresh_success = false;
- Status st = cache.replace_dict_keys_to_schema(index_id, &rs_meta);
- EXPECT_FALSE(st.ok());
-}
-
-// Test case 5: replace_schema_to_dict_keys with
tablet_schema.enable_variant_flatten_nested = true
-TEST(SchemaCloudDictionaryCacheTest,
ProcessDictionary_VariantPathConflict_Throws) {
- SchemaCloudDictionarySPtr dict = std::make_shared<SchemaCloudDictionary>();
- // construct two variant columns with same unique_id but different
path_info
- auto& col_dict = *dict->mutable_column_dict();
- ColumnPB* col1 = &(col_dict)[101];
- col1->set_unique_id(101);
- vectorized::PathInDataBuilder builder1;
- builder1.append("v", false).append("nested", true).append("a", false);
- vectorized::PathInData path_in_data1 = builder1.build();
- segment_v2::ColumnPathInfo path_info1;
- path_in_data1.to_protobuf(&path_info1, 0);
- col1->mutable_column_path_info()->CopyFrom(path_info1);
- {
- RowsetMetaCloudPB rs_meta;
- rs_meta.set_has_variant_type_in_schema(true);
- auto* schema = rs_meta.mutable_tablet_schema();
- schema->set_enable_variant_flatten_nested(true);
- // add two columns with same key but different is_nested value
- auto* col_schema1 = schema->add_column();
- col_schema1->set_unique_id(101);
- // create pathIndata with same key but different is_nested value
- vectorized::PathInDataBuilder builder3;
- builder3.append("v", false).append("nested", false).append("a", false);
- vectorized::PathInData path_in_data3 = builder3.build();
- segment_v2::ColumnPathInfo path_info3;
- path_in_data3.to_protobuf(&path_info3, 0);
- col_schema1->mutable_column_path_info()->CopyFrom(path_info3);
- auto st = check_path_amibigus(*dict, &rs_meta);
- EXPECT_FALSE(st.ok());
- EXPECT_EQ(st.code(), TStatusCode::DATA_QUALITY_ERROR);
- }
-
- {
- RowsetMetaCloudPB rs_meta;
- rs_meta.set_has_variant_type_in_schema(true);
- auto* schema = rs_meta.mutable_tablet_schema();
- // add two columns with same key but same is_nested value
- auto* col_schema3 = schema->add_column();
- col_schema3->set_unique_id(101);
- vectorized::PathInDataBuilder builder5;
- builder5.append("v", false).append("nested", true).append("a", false);
- vectorized::PathInData path_in_data5 = builder5.build();
- segment_v2::ColumnPathInfo path_info5;
- path_in_data5.to_protobuf(&path_info5, 0);
- col_schema3->mutable_column_path_info()->CopyFrom(path_info5);
- // assert no exception
- auto st = check_path_amibigus(*dict, &rs_meta);
- EXPECT_TRUE(st.ok()) << st.to_string();
- }
-}
-
-} // namespace doris
\ No newline at end of file
diff --git a/cloud/src/meta-service/meta_service_schema.cpp
b/cloud/src/meta-service/meta_service_schema.cpp
index bb82e049427..4dfa154a83b 100644
--- a/cloud/src/meta-service/meta_service_schema.cpp
+++ b/cloud/src/meta-service/meta_service_schema.cpp
@@ -208,6 +208,7 @@ void process_dictionary(SchemaCloudDictionary& dict,
}
}
+// **Notice**: Do not remove this code. We need this interface until all of
the BE has been upgraded to 4.0.x
// Writes schema dictionary metadata to RowsetMetaCloudPB.
// Schema was extended in BE side, we need to reset schema to original
frontend schema and store
// such restored schema in fdb. And also add extra dict key info to
RowsetMetaCloudPB.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]