This is an automated email from the ASF dual-hosted git repository. pengxiangyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 6a1e3d3435 [fix](cooldown)Fix bug for single cooldown compaction, add remote meta (#16812) 6a1e3d3435 is described below commit 6a1e3d3435ca95abbe7a62ba071a909041dba60d Author: pengxiangyu <diablo...@163.com> AuthorDate: Fri Feb 17 15:13:06 2023 +0800 [fix](cooldown)Fix bug for single cooldown compaction, add remote meta (#16812) * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction * fix bug, add remote meta for compaction --- be/src/olap/cold_data_compaction.cpp | 15 ++++- be/src/olap/storage_policy.cpp | 18 ++++++ be/src/olap/storage_policy.h | 3 + be/src/olap/tablet.cpp | 71 ++++++++++++++-------- be/src/olap/tablet.h | 13 ++-- .../apache/doris/catalog/TabletInvertedIndex.java | 5 +- 6 files changed, 88 insertions(+), 37 deletions(-) diff --git a/be/src/olap/cold_data_compaction.cpp b/be/src/olap/cold_data_compaction.cpp index 4b06ee7616..9f24c9c170 100644 --- a/be/src/olap/cold_data_compaction.cpp +++ b/be/src/olap/cold_data_compaction.cpp @@ -19,6 +19,7 @@ #include "common/compiler_util.h" #include "olap/compaction.h" +#include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset.h" namespace doris { @@ -60,14 +61,26 @@ Status ColdDataCompaction::pick_rowsets_to_compact() { } Status ColdDataCompaction::modify_rowsets() { + UniqueId cooldown_meta_id = UniqueId::gen_uid(); + + // write remote tablet meta + std::shared_ptr<io::RemoteFileSystem> fs; + RETURN_IF_ERROR(get_remote_file_system(_tablet->storage_policy_id(), &fs)); + std::vector<RowsetMetaSharedPtr> to_deletes; + for (auto& rs : _input_rowsets) { + to_deletes.emplace_back(rs->rowset_meta()); + } + RETURN_IF_ERROR(_tablet->write_cooldown_meta(fs, cooldown_meta_id, + _output_rowset->rowset_meta(), to_deletes)); { std::lock_guard wlock(_tablet->get_header_lock()); // Merged cooldowned rowsets MUST NOT be managed by version graph, they will be reclaimed by `remove_unused_remote_files`. _tablet->delete_rowsets(_input_rowsets, false); _tablet->add_rowsets({_output_rowset}); // TODO(plat1ko): process primary key - _tablet->tablet_meta()->set_cooldown_meta_id(UniqueId::gen_uid()); + _tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id); } + { std::shared_lock rlock(_tablet->get_header_lock()); _tablet->save_meta(); diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp index c227534907..68f561397d 100644 --- a/be/src/olap/storage_policy.cpp +++ b/be/src/olap/storage_policy.cpp @@ -31,6 +31,24 @@ struct StoragePolicyMgr { static StoragePolicyMgr s_storage_policy_mgr; +Status get_remote_file_system(int64_t storage_policy_id, + std::shared_ptr<io::RemoteFileSystem>* fs) { + auto storage_policy = get_storage_policy(storage_policy_id); + if (storage_policy == nullptr) { + return Status::InternalError("could not find storage_policy, storage_policy_id={}", + storage_policy_id); + } + auto resource = get_storage_resource(storage_policy->resource_id); + *fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs); + if (*fs == nullptr) { + return Status::InternalError("could not find resource, resouce_id={}", + storage_policy->resource_id); + } + DCHECK(atol((*fs)->id().c_str()) == storage_policy->resource_id); + DCHECK((*fs)->type() != io::FileSystemType::LOCAL); + return Status::OK(); +} + StoragePolicyPtr get_storage_policy(int64_t id) { std::lock_guard lock(s_storage_policy_mgr.mtx); if (auto it = s_storage_policy_mgr.map.find(id); it != s_storage_policy_mgr.map.end()) { diff --git a/be/src/olap/storage_policy.h b/be/src/olap/storage_policy.h index 4ddac3d36f..02d86603fa 100644 --- a/be/src/olap/storage_policy.h +++ b/be/src/olap/storage_policy.h @@ -18,6 +18,7 @@ #pragma once #include "io/fs/file_system.h" +#include "io/fs/remote_file_system.h" namespace doris { @@ -37,6 +38,8 @@ struct StoragePolicy { using StoragePolicyPtr = std::shared_ptr<StoragePolicy>; +Status get_remote_file_system(int64_t storage_policy_id, std::shared_ptr<io::RemoteFileSystem>* fs); + // return nullptr if not found StoragePolicyPtr get_storage_policy(int64_t id); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index f0ea777713..2a2ab4acf9 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -38,6 +38,7 @@ #include <set> #include <shared_mutex> #include <string> +#include <unordered_set> #include "agent/utils.h" #include "common/config.h" @@ -1709,23 +1710,14 @@ Status Tablet::cooldown() { if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf return Status::InternalError("invalid cooldown_replica_id"); } - auto storage_policy = get_storage_policy(storage_policy_id()); - if (storage_policy == nullptr) { - return Status::InternalError("could not find storage_policy, storage_policy_id={}", - storage_policy_id()); - } - auto resource = get_storage_resource(storage_policy->resource_id); - auto dest_fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs); - if (dest_fs == nullptr) { - return Status::InternalError("could not find resource, resouce_id={}", - storage_policy->resource_id); - } - DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id); - DCHECK(dest_fs->type() != io::FileSystemType::LOCAL); + + std::shared_ptr<io::RemoteFileSystem> fs; + RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs)); + if (cooldown_replica_id == replica_id()) { - RETURN_IF_ERROR(_cooldown_data(dest_fs)); + RETURN_IF_ERROR(_cooldown_data(fs)); } else { - RETURN_IF_ERROR(_follow_cooldowned_data(dest_fs.get(), cooldown_replica_id)); + RETURN_IF_ERROR(_follow_cooldowned_data(fs, cooldown_replica_id)); } return Status::OK(); } @@ -1768,7 +1760,7 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ UniqueId cooldown_meta_id = UniqueId::gen_uid(); // upload cooldowned rowset meta to remote fs - RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(), cooldown_meta_id, new_rowset_meta.get())); + RETURN_IF_ERROR(write_cooldown_meta(dest_fs, cooldown_meta_id, new_rowset_meta, {})); RowsetSharedPtr new_rowset; RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, &new_rowset); @@ -1789,8 +1781,8 @@ Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_ return Status::OK(); } -Status Tablet::_read_cooldown_meta(io::RemoteFileSystem* fs, int64_t cooldown_replica_id, - TabletMetaPB* tablet_meta_pb) { +Status Tablet::_read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, + int64_t cooldown_replica_id, TabletMetaPB* tablet_meta_pb) { std::string remote_meta_path = BetaRowset::remote_tablet_meta_path(tablet_id(), cooldown_replica_id); IOContext io_ctx; @@ -1807,29 +1799,53 @@ Status Tablet::_read_cooldown_meta(io::RemoteFileSystem* fs, int64_t cooldown_re return Status::OK(); } -Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId cooldown_meta_id, - RowsetMeta* new_rs_meta) { +Status Tablet::write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, + UniqueId cooldown_meta_id, + const RowsetMetaSharedPtr& new_rs_meta, + const std::vector<RowsetMetaSharedPtr>& to_deletes) { + std::unordered_set<Version, HashOfVersion> to_delete_set; + for (auto& rs_meta : to_deletes) { + to_delete_set.emplace(rs_meta->version()); + } + std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas; { std::shared_lock meta_rlock(_meta_lock); for (auto& rs_meta : _tablet_meta->all_rs_metas()) { if (!rs_meta->is_local()) { - cooldowned_rs_metas.push_back(rs_meta); + if (to_delete_set.find(rs_meta->version()) != to_delete_set.end()) { + continue; + } + cooldowned_rs_metas.emplace_back(rs_meta); } } } + cooldowned_rs_metas.emplace_back(new_rs_meta); std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), RowsetMeta::comparator); - if (UNLIKELY(!cooldowned_rs_metas.empty() && - new_rs_meta->start_version() != cooldowned_rs_metas.back()->end_version() + 1)) { - return Status::InternalError("version not continuous"); + + // check_version_continuity + if (!cooldowned_rs_metas.empty()) { + RowsetMetaSharedPtr prev_rowset_meta = cooldowned_rs_metas.front(); + for (size_t i = 1; i < cooldowned_rs_metas.size(); ++i) { + RowsetMetaSharedPtr rowset_meta = cooldowned_rs_metas[i]; + if (rowset_meta->start_version() != prev_rowset_meta->end_version() + 1) { + LOG(WARNING) << "There are missed versions among rowsets. " + << "prev_rowset_meta version=" << prev_rowset_meta->start_version() + << "-" << prev_rowset_meta->end_version() + << ", rowset_meta version=" << rowset_meta->start_version() << "-" + << rowset_meta->end_version(); + return Status::Error<CUMULATIVE_MISS_VERSION>(); + } + prev_rowset_meta = rowset_meta; + } } + TabletMetaPB tablet_meta_pb; auto rs_metas = tablet_meta_pb.mutable_rs_metas(); - rs_metas->Reserve(cooldowned_rs_metas.size() + 1); + rs_metas->Reserve(cooldowned_rs_metas.size()); for (auto& rs_meta : cooldowned_rs_metas) { rs_metas->Add(rs_meta->get_rowset_pb()); } - rs_metas->Add(new_rs_meta->get_rowset_pb()); tablet_meta_pb.mutable_cooldown_meta_id()->set_hi(cooldown_meta_id.hi); tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo); @@ -1842,7 +1858,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId cooldown_ return tablet_meta_writer->close(); } -Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldown_replica_id) { +Status Tablet::_follow_cooldowned_data(const std::shared_ptr<io::RemoteFileSystem>& fs, + int64_t cooldown_replica_id) { LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id() << " cooldown_replica_id=" << cooldown_replica_id << " local replica=" << replica_id(); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index ed8ada05c7..adcc6368f8 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -391,6 +391,10 @@ public: } } + Status write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, + UniqueId cooldown_meta_id, const RowsetMetaSharedPtr& new_rs_meta, + const std::vector<RowsetMetaSharedPtr>& to_deletes); + private: Status _init_once_action(); void _print_missed_versions(const std::vector<Version>& missed_versions) const; @@ -431,11 +435,10 @@ private: // begin cooldown functions //////////////////////////////////////////////////////////////////////////// Status _cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& dest_fs); - Status _follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldown_replica_id); - Status _read_cooldown_meta(io::RemoteFileSystem* fs, int64_t cooldown_replica_id, - TabletMetaPB* tablet_meta_pb); - Status _write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId cooldown_meta_id, - RowsetMeta* new_rs_meta); + Status _follow_cooldowned_data(const std::shared_ptr<io::RemoteFileSystem>& fs, + int64_t cooldown_replica_id); + Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs, + int64_t cooldown_replica_id, TabletMetaPB* tablet_meta_pb); //////////////////////////////////////////////////////////////////////////// // end cooldown functions //////////////////////////////////////////////////////////////////////////// diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java index 8de447050c..7fe4556dec 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java @@ -190,7 +190,7 @@ public class TabletInvertedIndex { } } - if (Config.enable_storage_policy) { + if (Config.enable_storage_policy && backendTabletInfo.isSetCooldownReplicaId()) { handleCooldownConf(tabletMeta, backendTabletInfo, cooldownConfToPush, cooldownConfToUpdate); replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId()); @@ -340,9 +340,6 @@ public class TabletInvertedIndex { private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo beTabletInfo, List<CooldownConf> cooldownConfToPush, List<CooldownConf> cooldownConfToUpdate) { - if (!beTabletInfo.isSetCooldownReplicaId()) { - return; - } Tablet tablet; try { OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId()) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org