This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 51ccdfa4b57 branch-3.0: [enhancement](schema-change) Cloud schema change do clean up when job failed #48426 (#48897) 51ccdfa4b57 is described below commit 51ccdfa4b57eed0a6f72d6a9511028040ac0ecf3 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Tue Mar 11 17:36:00 2025 +0800 branch-3.0: [enhancement](schema-change) Cloud schema change do clean up when job failed #48426 (#48897) Cherry-picked from #48426 Co-authored-by: Siyang Tang <tangsiyang2...@foxmail.com> --- be/src/agent/task_worker_pool.cpp | 80 +++++++++++++++----------------- be/src/cloud/cloud_schema_change_job.cpp | 12 +++++ be/src/cloud/cloud_schema_change_job.h | 2 + be/src/cloud/cloud_tablet.cpp | 28 ++++------- be/src/olap/rowset/rowset.cpp | 9 ++++ 5 files changed, 68 insertions(+), 63 deletions(-) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index da6184ad6fd..86ece5125ed 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -268,39 +268,46 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age // Do not need to adjust delete success or not // Because if delete failed create rollup will failed TTabletId new_tablet_id = 0; - if (status.ok()) { - new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; - auto mem_tracker = MemTrackerLimiter::create_shared( - MemTrackerLimiter::Type::SCHEMA_CHANGE, - fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", - std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), - std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), - engine.memory_limitation_bytes_per_thread_for_schema_change())); - SCOPED_ATTACH_TASK(mem_tracker); - DorisMetrics::instance()->create_rollup_requests_total->increment(1); - Status res = Status::OK(); - try { - LOG_INFO("start {}", process_name) - .tag("signature", agent_task_req.signature) - .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) - .tag("new_tablet_id", new_tablet_id) - .tag("mem_limit", - engine.memory_limitation_bytes_per_thread_for_schema_change()); - DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id); - CloudSchemaChangeJob job(engine, - std::to_string(agent_task_req.alter_tablet_req_v2.job_id), - agent_task_req.alter_tablet_req_v2.expiration); - status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2); - } catch (const Exception& e) { - status = e.to_status(); - } - if (!status.ok()) { - DorisMetrics::instance()->create_rollup_requests_failed->increment(1); - } - } + new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; + auto mem_tracker = MemTrackerLimiter::create_shared( + MemTrackerLimiter::Type::SCHEMA_CHANGE, + fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", + std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), + std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), + engine.memory_limitation_bytes_per_thread_for_schema_change())); + SCOPED_ATTACH_TASK(mem_tracker); + DorisMetrics::instance()->create_rollup_requests_total->increment(1); + + LOG_INFO("start {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id) + .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change()); + DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id); + CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id), + agent_task_req.alter_tablet_req_v2.expiration); + status = [&]() { + HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( + job.process_alter_tablet(agent_task_req.alter_tablet_req_v2), + [&](const doris::Exception& ex) { + DorisMetrics::instance()->create_rollup_requests_failed->increment(1); + job.clean_up_on_failed(); + }); + return Status::OK(); + }(); if (status.ok()) { increase_report_version(); + LOG_INFO("successfully {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id); + } else { + LOG_WARNING("failed to {}", process_name) + .tag("signature", agent_task_req.signature) + .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) + .tag("new_tablet_id", new_tablet_id) + .error(status); } // Return result to fe @@ -308,19 +315,6 @@ void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& age finish_task_request->__set_report_version(s_report_version); finish_task_request->__set_task_type(task_type); finish_task_request->__set_signature(signature); - - if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) { - LOG_WARNING("failed to {}", process_name) - .tag("signature", agent_task_req.signature) - .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) - .tag("new_tablet_id", new_tablet_id) - .error(status); - } else { - LOG_INFO("successfully {}", process_name) - .tag("signature", agent_task_req.signature) - .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) - .tag("new_tablet_id", new_tablet_id); - } finish_task_request->__set_task_status(status.to_thrift()); } diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index 1ec08acc9eb..b98dfb33a24 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -495,4 +495,16 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, return Status::OK(); } +void CloudSchemaChangeJob::clean_up_on_failed() { + for (const auto& output_rs : _output_rowsets) { + if (output_rs.use_count() > 2) { + LOG(WARNING) << "Rowset " << output_rs->rowset_id().to_string() << " has " + << output_rs.use_count() + << " references. File Cache won't be recycled when query is using it."; + return; + } + output_rs->clear_cache(); + } +} + } // namespace doris diff --git a/be/src/cloud/cloud_schema_change_job.h b/be/src/cloud/cloud_schema_change_job.h index c77aae48570..dee71cd3104 100644 --- a/be/src/cloud/cloud_schema_change_job.h +++ b/be/src/cloud/cloud_schema_change_job.h @@ -36,6 +36,8 @@ public: // This method is idempotent for a same request. Status process_alter_tablet(const TAlterTabletReqV2& request); + void clean_up_on_failed(); + private: Status _convert_historical_rowsets(const SchemaChangeParams& sc_params, cloud::TabletJobInfoPB& job); diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index bda4e8ea1a3..82a5927c1c4 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -33,6 +33,7 @@ #include "cloud/cloud_meta_mgr.h" #include "cloud/cloud_storage_engine.h" #include "cloud/cloud_tablet_mgr.h" +#include "common/config.h" #include "common/logging.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" @@ -452,27 +453,14 @@ void CloudTablet::clear_cache() { } void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) { - for (auto& rs : rowsets) { - // Clear cached opened segments and inverted index cache in memory - rs->clear_cache(); - } - - if (config::enable_file_cache) { - for (const auto& rs : rowsets) { - // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2. - if (rs.use_count() > 2) { - LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " - << rs.use_count() - << " references. File Cache won't be recycled when query is using it."; - continue; - } - for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) { - // TODO: Segment::file_cache_key - auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id); - auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); - file_cache->remove_if_cached_async(file_key); - } + for (const auto& rs : rowsets) { + // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2. + if (rs.use_count() > 2) { + LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() + << " references. File Cache won't be recycled when query is using it."; + return; } + rs->clear_cache(); } } diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp index ac3a2a7a1dc..3b86504090d 100644 --- a/be/src/olap/rowset/rowset.cpp +++ b/be/src/olap/rowset/rowset.cpp @@ -19,6 +19,8 @@ #include <gen_cpp/olap_file.pb.h> +#include "common/config.h" +#include "io/cache/block_file_cache_factory.h" #include "olap/olap_define.h" #include "olap/segment_loader.h" #include "olap/tablet_schema.h" @@ -120,6 +122,13 @@ void Rowset::clear_cache() { SCOPED_SIMPLE_TRACE_IF_TIMEOUT(std::chrono::seconds(1)); clear_inverted_index_cache(); } + if (config::enable_file_cache) { + for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { + auto file_key = segment_v2::Segment::file_cache_key(rowset_id().to_string(), seg_id); + auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); + file_cache->remove_if_cached_async(file_key); + } + } } Result<std::string> Rowset::segment_path(int64_t seg_id) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org