This is an automated email from the ASF dual-hosted git repository.

pengxiangyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a1e3d3435 [fix](cooldown)Fix bug for single cooldown compaction, add 
remote meta (#16812)
6a1e3d3435 is described below

commit 6a1e3d3435ca95abbe7a62ba071a909041dba60d
Author: pengxiangyu <diablo...@163.com>
AuthorDate: Fri Feb 17 15:13:06 2023 +0800

    [fix](cooldown)Fix bug for single cooldown compaction, add remote meta 
(#16812)
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
    
    * fix bug, add remote meta for compaction
---
 be/src/olap/cold_data_compaction.cpp               | 15 ++++-
 be/src/olap/storage_policy.cpp                     | 18 ++++++
 be/src/olap/storage_policy.h                       |  3 +
 be/src/olap/tablet.cpp                             | 71 ++++++++++++++--------
 be/src/olap/tablet.h                               | 13 ++--
 .../apache/doris/catalog/TabletInvertedIndex.java  |  5 +-
 6 files changed, 88 insertions(+), 37 deletions(-)

diff --git a/be/src/olap/cold_data_compaction.cpp 
b/be/src/olap/cold_data_compaction.cpp
index 4b06ee7616..9f24c9c170 100644
--- a/be/src/olap/cold_data_compaction.cpp
+++ b/be/src/olap/cold_data_compaction.cpp
@@ -19,6 +19,7 @@
 
 #include "common/compiler_util.h"
 #include "olap/compaction.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
 
 namespace doris {
@@ -60,14 +61,26 @@ Status ColdDataCompaction::pick_rowsets_to_compact() {
 }
 
 Status ColdDataCompaction::modify_rowsets() {
+    UniqueId cooldown_meta_id = UniqueId::gen_uid();
+
+    // write remote tablet meta
+    std::shared_ptr<io::RemoteFileSystem> fs;
+    RETURN_IF_ERROR(get_remote_file_system(_tablet->storage_policy_id(), &fs));
+    std::vector<RowsetMetaSharedPtr> to_deletes;
+    for (auto& rs : _input_rowsets) {
+        to_deletes.emplace_back(rs->rowset_meta());
+    }
+    RETURN_IF_ERROR(_tablet->write_cooldown_meta(fs, cooldown_meta_id,
+                                                 
_output_rowset->rowset_meta(), to_deletes));
     {
         std::lock_guard wlock(_tablet->get_header_lock());
         // Merged cooldowned rowsets MUST NOT be managed by version graph, 
they will be reclaimed by `remove_unused_remote_files`.
         _tablet->delete_rowsets(_input_rowsets, false);
         _tablet->add_rowsets({_output_rowset});
         // TODO(plat1ko): process primary key
-        _tablet->tablet_meta()->set_cooldown_meta_id(UniqueId::gen_uid());
+        _tablet->tablet_meta()->set_cooldown_meta_id(cooldown_meta_id);
     }
+
     {
         std::shared_lock rlock(_tablet->get_header_lock());
         _tablet->save_meta();
diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp
index c227534907..68f561397d 100644
--- a/be/src/olap/storage_policy.cpp
+++ b/be/src/olap/storage_policy.cpp
@@ -31,6 +31,24 @@ struct StoragePolicyMgr {
 
 static StoragePolicyMgr s_storage_policy_mgr;
 
+Status get_remote_file_system(int64_t storage_policy_id,
+                              std::shared_ptr<io::RemoteFileSystem>* fs) {
+    auto storage_policy = get_storage_policy(storage_policy_id);
+    if (storage_policy == nullptr) {
+        return Status::InternalError("could not find storage_policy, 
storage_policy_id={}",
+                                     storage_policy_id);
+    }
+    auto resource = get_storage_resource(storage_policy->resource_id);
+    *fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+    if (*fs == nullptr) {
+        return Status::InternalError("could not find resource, resouce_id={}",
+                                     storage_policy->resource_id);
+    }
+    DCHECK(atol((*fs)->id().c_str()) == storage_policy->resource_id);
+    DCHECK((*fs)->type() != io::FileSystemType::LOCAL);
+    return Status::OK();
+}
+
 StoragePolicyPtr get_storage_policy(int64_t id) {
     std::lock_guard lock(s_storage_policy_mgr.mtx);
     if (auto it = s_storage_policy_mgr.map.find(id); it != 
s_storage_policy_mgr.map.end()) {
diff --git a/be/src/olap/storage_policy.h b/be/src/olap/storage_policy.h
index 4ddac3d36f..02d86603fa 100644
--- a/be/src/olap/storage_policy.h
+++ b/be/src/olap/storage_policy.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "io/fs/file_system.h"
+#include "io/fs/remote_file_system.h"
 
 namespace doris {
 
@@ -37,6 +38,8 @@ struct StoragePolicy {
 
 using StoragePolicyPtr = std::shared_ptr<StoragePolicy>;
 
+Status get_remote_file_system(int64_t storage_policy_id, 
std::shared_ptr<io::RemoteFileSystem>* fs);
+
 // return nullptr if not found
 StoragePolicyPtr get_storage_policy(int64_t id);
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index f0ea777713..2a2ab4acf9 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -38,6 +38,7 @@
 #include <set>
 #include <shared_mutex>
 #include <string>
+#include <unordered_set>
 
 #include "agent/utils.h"
 #include "common/config.h"
@@ -1709,23 +1710,14 @@ Status Tablet::cooldown() {
     if (cooldown_replica_id <= 0) { // wait for FE to push cooldown conf
         return Status::InternalError("invalid cooldown_replica_id");
     }
-    auto storage_policy = get_storage_policy(storage_policy_id());
-    if (storage_policy == nullptr) {
-        return Status::InternalError("could not find storage_policy, 
storage_policy_id={}",
-                                     storage_policy_id());
-    }
-    auto resource = get_storage_resource(storage_policy->resource_id);
-    auto dest_fs = std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
-    if (dest_fs == nullptr) {
-        return Status::InternalError("could not find resource, resouce_id={}",
-                                     storage_policy->resource_id);
-    }
-    DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
-    DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+    std::shared_ptr<io::RemoteFileSystem> fs;
+    RETURN_IF_ERROR(get_remote_file_system(storage_policy_id(), &fs));
+
     if (cooldown_replica_id == replica_id()) {
-        RETURN_IF_ERROR(_cooldown_data(dest_fs));
+        RETURN_IF_ERROR(_cooldown_data(fs));
     } else {
-        RETURN_IF_ERROR(_follow_cooldowned_data(dest_fs.get(), 
cooldown_replica_id));
+        RETURN_IF_ERROR(_follow_cooldowned_data(fs, cooldown_replica_id));
     }
     return Status::OK();
 }
@@ -1768,7 +1760,7 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
     UniqueId cooldown_meta_id = UniqueId::gen_uid();
 
     // upload cooldowned rowset meta to remote fs
-    RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(), cooldown_meta_id, 
new_rowset_meta.get()));
+    RETURN_IF_ERROR(write_cooldown_meta(dest_fs, cooldown_meta_id, 
new_rowset_meta, {}));
 
     RowsetSharedPtr new_rowset;
     RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, 
&new_rowset);
@@ -1789,8 +1781,8 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
     return Status::OK();
 }
 
-Status Tablet::_read_cooldown_meta(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id,
-                                   TabletMetaPB* tablet_meta_pb) {
+Status Tablet::_read_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
+                                   int64_t cooldown_replica_id, TabletMetaPB* 
tablet_meta_pb) {
     std::string remote_meta_path =
             BetaRowset::remote_tablet_meta_path(tablet_id(), 
cooldown_replica_id);
     IOContext io_ctx;
@@ -1807,29 +1799,53 @@ Status 
Tablet::_read_cooldown_meta(io::RemoteFileSystem* fs, int64_t cooldown_re
     return Status::OK();
 }
 
-Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId 
cooldown_meta_id,
-                                    RowsetMeta* new_rs_meta) {
+Status Tablet::write_cooldown_meta(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
+                                   UniqueId cooldown_meta_id,
+                                   const RowsetMetaSharedPtr& new_rs_meta,
+                                   const std::vector<RowsetMetaSharedPtr>& 
to_deletes) {
+    std::unordered_set<Version, HashOfVersion> to_delete_set;
+    for (auto& rs_meta : to_deletes) {
+        to_delete_set.emplace(rs_meta->version());
+    }
+
     std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
     {
         std::shared_lock meta_rlock(_meta_lock);
         for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
             if (!rs_meta->is_local()) {
-                cooldowned_rs_metas.push_back(rs_meta);
+                if (to_delete_set.find(rs_meta->version()) != 
to_delete_set.end()) {
+                    continue;
+                }
+                cooldowned_rs_metas.emplace_back(rs_meta);
             }
         }
     }
+    cooldowned_rs_metas.emplace_back(new_rs_meta);
     std::sort(cooldowned_rs_metas.begin(), cooldowned_rs_metas.end(), 
RowsetMeta::comparator);
-    if (UNLIKELY(!cooldowned_rs_metas.empty() &&
-                 new_rs_meta->start_version() != 
cooldowned_rs_metas.back()->end_version() + 1)) {
-        return Status::InternalError("version not continuous");
+
+    // check_version_continuity
+    if (!cooldowned_rs_metas.empty()) {
+        RowsetMetaSharedPtr prev_rowset_meta = cooldowned_rs_metas.front();
+        for (size_t i = 1; i < cooldowned_rs_metas.size(); ++i) {
+            RowsetMetaSharedPtr rowset_meta = cooldowned_rs_metas[i];
+            if (rowset_meta->start_version() != 
prev_rowset_meta->end_version() + 1) {
+                LOG(WARNING) << "There are missed versions among rowsets. "
+                             << "prev_rowset_meta version=" << 
prev_rowset_meta->start_version()
+                             << "-" << prev_rowset_meta->end_version()
+                             << ", rowset_meta version=" << 
rowset_meta->start_version() << "-"
+                             << rowset_meta->end_version();
+                return Status::Error<CUMULATIVE_MISS_VERSION>();
+            }
+            prev_rowset_meta = rowset_meta;
+        }
     }
+
     TabletMetaPB tablet_meta_pb;
     auto rs_metas = tablet_meta_pb.mutable_rs_metas();
-    rs_metas->Reserve(cooldowned_rs_metas.size() + 1);
+    rs_metas->Reserve(cooldowned_rs_metas.size());
     for (auto& rs_meta : cooldowned_rs_metas) {
         rs_metas->Add(rs_meta->get_rowset_pb());
     }
-    rs_metas->Add(new_rs_meta->get_rowset_pb());
     tablet_meta_pb.mutable_cooldown_meta_id()->set_hi(cooldown_meta_id.hi);
     tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
 
@@ -1842,7 +1858,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* 
fs, UniqueId cooldown_
     return tablet_meta_writer->close();
 }
 
-Status Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id) {
+Status Tablet::_follow_cooldowned_data(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
+                                       int64_t cooldown_replica_id) {
     LOG(INFO) << "try to follow cooldowned data. tablet_id=" << tablet_id()
               << " cooldown_replica_id=" << cooldown_replica_id
               << " local replica=" << replica_id();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index ed8ada05c7..adcc6368f8 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -391,6 +391,10 @@ public:
         }
     }
 
+    Status write_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
+                               UniqueId cooldown_meta_id, const 
RowsetMetaSharedPtr& new_rs_meta,
+                               const std::vector<RowsetMetaSharedPtr>& 
to_deletes);
+
 private:
     Status _init_once_action();
     void _print_missed_versions(const std::vector<Version>& missed_versions) 
const;
@@ -431,11 +435,10 @@ private:
     // begin cooldown functions
     
////////////////////////////////////////////////////////////////////////////
     Status _cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& 
dest_fs);
-    Status _follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id);
-    Status _read_cooldown_meta(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id,
-                               TabletMetaPB* tablet_meta_pb);
-    Status _write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId 
cooldown_meta_id,
-                                RowsetMeta* new_rs_meta);
+    Status _follow_cooldowned_data(const 
std::shared_ptr<io::RemoteFileSystem>& fs,
+                                   int64_t cooldown_replica_id);
+    Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
+                               int64_t cooldown_replica_id, TabletMetaPB* 
tablet_meta_pb);
     
////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 8de447050c..7fe4556dec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -190,7 +190,7 @@ public class TabletInvertedIndex {
                                 }
                             }
 
-                            if (Config.enable_storage_policy) {
+                            if (Config.enable_storage_policy && 
backendTabletInfo.isSetCooldownReplicaId()) {
                                 handleCooldownConf(tabletMeta, 
backendTabletInfo, cooldownConfToPush,
                                         cooldownConfToUpdate);
                                 
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
@@ -340,9 +340,6 @@ public class TabletInvertedIndex {
 
     private void handleCooldownConf(TabletMeta tabletMeta, TTabletInfo 
beTabletInfo,
             List<CooldownConf> cooldownConfToPush, List<CooldownConf> 
cooldownConfToUpdate) {
-        if (!beTabletInfo.isSetCooldownReplicaId()) {
-            return;
-        }
         Tablet tablet;
         try {
             OlapTable table = (OlapTable) 
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to