This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 51ccdfa4b57 branch-3.0: [enhancement](schema-change) Cloud schema 
change do clean up when job failed #48426 (#48897)
51ccdfa4b57 is described below

commit 51ccdfa4b57eed0a6f72d6a9511028040ac0ecf3
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 11 17:36:00 2025 +0800

    branch-3.0: [enhancement](schema-change) Cloud schema change do clean up 
when job failed #48426 (#48897)
    
    Cherry-picked from #48426
    
    Co-authored-by: Siyang Tang <tangsiyang2...@foxmail.com>
---
 be/src/agent/task_worker_pool.cpp        | 80 +++++++++++++++-----------------
 be/src/cloud/cloud_schema_change_job.cpp | 12 +++++
 be/src/cloud/cloud_schema_change_job.h   |  2 +
 be/src/cloud/cloud_tablet.cpp            | 28 ++++-------
 be/src/olap/rowset/rowset.cpp            |  9 ++++
 5 files changed, 68 insertions(+), 63 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index da6184ad6fd..86ece5125ed 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -268,39 +268,46 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const 
TAgentTaskRequest& age
     // Do not need to adjust delete success or not
     // Because if delete failed create rollup will failed
     TTabletId new_tablet_id = 0;
-    if (status.ok()) {
-        new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
-        auto mem_tracker = MemTrackerLimiter::create_shared(
-                MemTrackerLimiter::Type::SCHEMA_CHANGE,
-                
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
-                            
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
-                            
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
-                            
engine.memory_limitation_bytes_per_thread_for_schema_change()));
-        SCOPED_ATTACH_TASK(mem_tracker);
-        DorisMetrics::instance()->create_rollup_requests_total->increment(1);
-        Status res = Status::OK();
-        try {
-            LOG_INFO("start {}", process_name)
-                    .tag("signature", agent_task_req.signature)
-                    .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                    .tag("new_tablet_id", new_tablet_id)
-                    .tag("mem_limit",
-                         
engine.memory_limitation_bytes_per_thread_for_schema_change());
-            DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
-            CloudSchemaChangeJob job(engine,
-                                     
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
-                                     
agent_task_req.alter_tablet_req_v2.expiration);
-            status = 
job.process_alter_tablet(agent_task_req.alter_tablet_req_v2);
-        } catch (const Exception& e) {
-            status = e.to_status();
-        }
-        if (!status.ok()) {
-            
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
-        }
-    }
+    new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id;
+    auto mem_tracker = MemTrackerLimiter::create_shared(
+            MemTrackerLimiter::Type::SCHEMA_CHANGE,
+            fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
+                        
std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id),
+                        
std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id),
+                        
engine.memory_limitation_bytes_per_thread_for_schema_change()));
+    SCOPED_ATTACH_TASK(mem_tracker);
+    DorisMetrics::instance()->create_rollup_requests_total->increment(1);
+
+    LOG_INFO("start {}", process_name)
+            .tag("signature", agent_task_req.signature)
+            .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+            .tag("new_tablet_id", new_tablet_id)
+            .tag("mem_limit", 
engine.memory_limitation_bytes_per_thread_for_schema_change());
+    DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id);
+    CloudSchemaChangeJob job(engine, 
std::to_string(agent_task_req.alter_tablet_req_v2.job_id),
+                             agent_task_req.alter_tablet_req_v2.expiration);
+    status = [&]() {
+        HANDLE_EXCEPTION_IF_CATCH_EXCEPTION(
+                job.process_alter_tablet(agent_task_req.alter_tablet_req_v2),
+                [&](const doris::Exception& ex) {
+                    
DorisMetrics::instance()->create_rollup_requests_failed->increment(1);
+                    job.clean_up_on_failed();
+                });
+        return Status::OK();
+    }();
 
     if (status.ok()) {
         increase_report_version();
+        LOG_INFO("successfully {}", process_name)
+                .tag("signature", agent_task_req.signature)
+                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                .tag("new_tablet_id", new_tablet_id);
+    } else {
+        LOG_WARNING("failed to {}", process_name)
+                .tag("signature", agent_task_req.signature)
+                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
+                .tag("new_tablet_id", new_tablet_id)
+                .error(status);
     }
 
     // Return result to fe
@@ -308,19 +315,6 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const 
TAgentTaskRequest& age
     finish_task_request->__set_report_version(s_report_version);
     finish_task_request->__set_task_type(task_type);
     finish_task_request->__set_signature(signature);
-
-    if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) {
-        LOG_WARNING("failed to {}", process_name)
-                .tag("signature", agent_task_req.signature)
-                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                .tag("new_tablet_id", new_tablet_id)
-                .error(status);
-    } else {
-        LOG_INFO("successfully {}", process_name)
-                .tag("signature", agent_task_req.signature)
-                .tag("base_tablet_id", 
agent_task_req.alter_tablet_req_v2.base_tablet_id)
-                .tag("new_tablet_id", new_tablet_id);
-    }
     finish_task_request->__set_task_status(status.to_thrift());
 }
 
diff --git a/be/src/cloud/cloud_schema_change_job.cpp 
b/be/src/cloud/cloud_schema_change_job.cpp
index 1ec08acc9eb..b98dfb33a24 100644
--- a/be/src/cloud/cloud_schema_change_job.cpp
+++ b/be/src/cloud/cloud_schema_change_job.cpp
@@ -495,4 +495,16 @@ Status 
CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
     return Status::OK();
 }
 
+void CloudSchemaChangeJob::clean_up_on_failed() {
+    for (const auto& output_rs : _output_rowsets) {
+        if (output_rs.use_count() > 2) {
+            LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << 
" has "
+                         << output_rs.use_count()
+                         << " references. File Cache won't be recycled when 
query is using it.";
+            return;
+        }
+        output_rs->clear_cache();
+    }
+}
+
 } // namespace doris
diff --git a/be/src/cloud/cloud_schema_change_job.h 
b/be/src/cloud/cloud_schema_change_job.h
index c77aae48570..dee71cd3104 100644
--- a/be/src/cloud/cloud_schema_change_job.h
+++ b/be/src/cloud/cloud_schema_change_job.h
@@ -36,6 +36,8 @@ public:
     // This method is idempotent for a same request.
     Status process_alter_tablet(const TAlterTabletReqV2& request);
 
+    void clean_up_on_failed();
+
 private:
     Status _convert_historical_rowsets(const SchemaChangeParams& sc_params,
                                        cloud::TabletJobInfoPB& job);
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index bda4e8ea1a3..82a5927c1c4 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -33,6 +33,7 @@
 #include "cloud/cloud_meta_mgr.h"
 #include "cloud/cloud_storage_engine.h"
 #include "cloud/cloud_tablet_mgr.h"
+#include "common/config.h"
 #include "common/logging.h"
 #include "io/cache/block_file_cache_downloader.h"
 #include "io/cache/block_file_cache_factory.h"
@@ -452,27 +453,14 @@ void CloudTablet::clear_cache() {
 }
 
 void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& 
rowsets) {
-    for (auto& rs : rowsets) {
-        // Clear cached opened segments and inverted index cache in memory
-        rs->clear_cache();
-    }
-
-    if (config::enable_file_cache) {
-        for (const auto& rs : rowsets) {
-            // rowsets and tablet._rs_version_map each hold a rowset 
shared_ptr, so at this point, the reference count of the shared_ptr is at least 
2.
-            if (rs.use_count() > 2) {
-                LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " 
has "
-                             << rs.use_count()
-                             << " references. File Cache won't be recycled 
when query is using it.";
-                continue;
-            }
-            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
-                // TODO: Segment::file_cache_key
-                auto file_key = 
Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
-                auto* file_cache = 
io::FileCacheFactory::instance()->get_by_path(file_key);
-                file_cache->remove_if_cached_async(file_key);
-            }
+    for (const auto& rs : rowsets) {
+        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, 
so at this point, the reference count of the shared_ptr is at least 2.
+        if (rs.use_count() > 2) {
+            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has 
" << rs.use_count()
+                         << " references. File Cache won't be recycled when 
query is using it.";
+            return;
         }
+        rs->clear_cache();
     }
 }
 
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index ac3a2a7a1dc..3b86504090d 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -19,6 +19,8 @@
 
 #include <gen_cpp/olap_file.pb.h>
 
+#include "common/config.h"
+#include "io/cache/block_file_cache_factory.h"
 #include "olap/olap_define.h"
 #include "olap/segment_loader.h"
 #include "olap/tablet_schema.h"
@@ -120,6 +122,13 @@ void Rowset::clear_cache() {
         SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1));
         clear_inverted_index_cache();
     }
+    if (config::enable_file_cache) {
+        for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+            auto file_key = 
segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id);
+            auto* file_cache = 
io::FileCacheFactory::instance()->get_by_path(file_key);
+            file_cache->remove_if_cached_async(file_key);
+        }
+    }
 }
 
 Result<std::string> Rowset::segment_path(int64_t seg_id) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to